This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new f8be1cae fix: grpc proxyless xds connection and test example (#809)
f8be1cae is described below
commit f8be1cae86c735c5bf26cb4903bb538e51982507
Author: Jian Zhong <[email protected]>
AuthorDate: Fri Nov 7 12:42:27 2025 +0800
fix: grpc proxyless xds connection and test example (#809)
---
README.md | 8 +-
pkg/dubbo-agent/xds_proxy.go | 51 ++-
pkg/xds/server.go | 25 ++
sail/pkg/model/context.go | 17 +-
sail/pkg/model/endpointshards.go | 29 +-
sail/pkg/model/service.go | 32 +-
sail/pkg/model/typed_xds_cache.go | 19 +-
sail/pkg/networking/grpcgen/cds.go | 16 +
sail/pkg/networking/grpcgen/grpcgen.go | 18 +-
sail/pkg/networking/grpcgen/lds.go | 329 +++++++++++++++--
sail/pkg/networking/grpcgen/rds.go | 94 ++++-
sail/pkg/networking/util/util.go | 3 +-
.../serviceregistry/kube/controller/controller.go | 88 ++++-
.../kube/controller/endpointslice.go | 193 +++++++++-
sail/pkg/xds/ads.go | 48 ++-
sail/pkg/xds/discovery.go | 9 +
sail/pkg/xds/eds.go | 66 +++-
sail/pkg/xds/endpoints/endpoint_builder.go | 63 +++-
sail/pkg/xds/lds.go | 10 +-
sail/pkg/xds/xdsgen.go | 363 ++++++++++++++++++-
samples/example/README.md | 1 -
samples/grpc-proxyless/grpc-proxyless.yaml | 107 ++++++
test/grpc-proxyless/.dockerignore | 6 +
test/grpc-proxyless/.gitignore | 2 +
test/grpc-proxyless/Dockerfile.consumer | 35 ++
test/grpc-proxyless/Dockerfile.producer | 35 ++
test/grpc-proxyless/README.md | 12 +
test/grpc-proxyless/consumer/main.go | 135 +++++++
test/grpc-proxyless/generate-proto.sh | 12 +
test/grpc-proxyless/go.mod | 29 ++
test/grpc-proxyless/go.sum | 76 ++++
test/grpc-proxyless/producer/main.go | 401 +++++++++++++++++++++
test/grpc-proxyless/proto/echo.proto | 42 +++
test/grpc-proxyless/proto/gen.sh | 7 +
test/grpc-proxyless/test-commands.sh | 49 +++
test/grpc-proxyless/test.sh | 56 +++
36 files changed, 2382 insertions(+), 104 deletions(-)
diff --git a/README.md b/README.md
index 44663ae1..fc5687a4 100644
--- a/README.md
+++ b/README.md
@@ -13,9 +13,9 @@ The main repositories of Dubbo on Kubernetes include:
- **dubboctl** — The command-line management tool that provides control plane
management, development framework scaffolding, and application deployment.
- **dubbod** — The dubbo control plane. It is built on Istio to implement a
proxyless service mesh and includes the following components:
- - **Sail** - (under development): Runtime proxy configuration.
- - **Aegis** - (under development): Certificate issuance and rotation.
- - **Gear** - (under development): Validation, aggregation, transformation,
and distribution of Dubbo configuration.
+ - **sail** - Runtime proxy configuration.
+ - **aegis** - Certificate issuance and rotation.
+ - **gear** - Validation, aggregation, transformation, and distribution of
Dubbo configuration.
- **operator**: Provides user-friendly options to operate the Dubbo proxyless
service mesh.
## Quick Start
@@ -25,4 +25,4 @@ Please refer to [official
website](https://cn.dubbo.apache.org/zh-cn/overview/ho
Refer to
[CONTRIBUTING.md](https://github.com/apache/dubbo-kubernetes/blob/master/CONTRIBUTING.md)
## License
-Apache License 2.0, see
[LICENSE](https://github.com/apache/dubbo-kubernetes/blob/master/LICENSE).
\ No newline at end of file
+Apache License 2.0, see
[LICENSE](https://github.com/apache/dubbo-kubernetes/blob/master/LICENSE).
diff --git a/pkg/dubbo-agent/xds_proxy.go b/pkg/dubbo-agent/xds_proxy.go
index 66917adb..7a5cd2dc 100644
--- a/pkg/dubbo-agent/xds_proxy.go
+++ b/pkg/dubbo-agent/xds_proxy.go
@@ -163,6 +163,9 @@ func (p *XdsProxy) handleUpstream(ctx context.Context, con
*ProxyConnection, xds
klog.Infof("XDS proxy: connection #%d received
first response from upstream: TypeUrl=%s, Resources=%d",
con.conID,
model.GetShortType(resp.TypeUrl), len(resp.Resources))
firstResponse = false
+ } else {
+ klog.V(3).Infof("XDS proxy: connection #%d
received response from upstream: TypeUrl=%s, Resources=%d, VersionInfo=%s",
+ con.conID,
model.GetShortType(resp.TypeUrl), len(resp.Resources), resp.VersionInfo)
}
select {
case con.responsesChan <- resp:
@@ -202,7 +205,7 @@ func (p *XdsProxy) handleUpstreamRequest(con
*ProxyConnection) {
return
}
- klog.V(2).Infof("XDS proxy: connection #%d received
request: TypeUrl=%s, Node=%v, ResourceNames=%d",
+ klog.V(3).Infof("XDS proxy: connection #%d received
request: TypeUrl=%s, Node=%v, ResourceNames=%d",
con.conID, model.GetShortType(req.TypeUrl),
req.Node != nil, len(req.ResourceNames))
// Save Node from first request that contains it
@@ -242,10 +245,26 @@ func (p *XdsProxy) handleUpstreamRequest(con
*ProxyConnection) {
}
// Ensure Node is set in request before forwarding
+ // For proxyless gRPC, we must ensure Node is always
set in every request
con.nodeMutex.RLock()
- if con.node != nil && req.Node == nil {
- req.Node = con.node
- klog.V(2).Infof("XDS proxy: connection #%d
added saved Node to request", con.conID)
+ if con.node != nil {
+ if req.Node == nil {
+ // Deep copy Node to avoid race
conditions
+ req.Node = &core.Node{
+ Id: con.node.Id,
+ Cluster: con.node.Cluster,
+ Locality: con.node.Locality,
+ Metadata: con.node.Metadata,
+ }
+ klog.V(2).Infof("XDS proxy: connection
#%d added saved Node to request", con.conID)
+ } else if req.Node.Id == "" {
+ // If Node exists but Id is empty, copy
from saved node
+ req.Node.Id = con.node.Id
+ if req.Node.Metadata == nil {
+ req.Node.Metadata =
con.node.Metadata
+ }
+ klog.V(2).Infof("XDS proxy: connection
#%d filled empty Node.Id in request", con.conID)
+ }
}
con.nodeMutex.RUnlock()
@@ -314,9 +333,24 @@ func (p *XdsProxy) handleUpstreamRequest(con
*ProxyConnection) {
}
// Ensure Node is set before sending to upstream
+ // For proxyless gRPC, we must ensure Node is always
set in every request
con.nodeMutex.RLock()
- if con.node != nil && req.Node == nil {
- req.Node = con.node
+ if con.node != nil {
+ if req.Node == nil {
+ // Deep copy Node to avoid race
conditions
+ req.Node = &core.Node{
+ Id: con.node.Id,
+ Cluster: con.node.Cluster,
+ Locality: con.node.Locality,
+ Metadata: con.node.Metadata,
+ }
+ } else if req.Node.Id == "" {
+ // If Node exists but Id is empty, copy
from saved node
+ req.Node.Id = con.node.Id
+ if req.Node.Metadata == nil {
+ req.Node.Metadata =
con.node.Metadata
+ }
+ }
}
con.nodeMutex.RUnlock()
@@ -412,12 +446,15 @@ func (p *XdsProxy) handleUpstreamResponse(con
*ProxyConnection) {
}
// Forward all non-internal responses to downstream
(gRPC client)
- // This is critical for proxyless gRPC clients to
receive XDS configuration
+ klog.V(3).Infof("XDS proxy: connection #%d forwarding
response to downstream: TypeUrl=%s, Resources=%d, VersionInfo=%s",
+ con.conID, model.GetShortType(resp.TypeUrl),
len(resp.Resources), resp.VersionInfo)
if err := con.downstream.Send(resp); err != nil {
klog.Errorf("XDS proxy: connection #%d failed
to send response to downstream: %v", con.conID, err)
downstreamErr(con, err)
return
}
+ klog.V(3).Infof("XDS proxy: connection #%d successfully
forwarded response to downstream: TypeUrl=%s, Resources=%d",
+ con.conID, model.GetShortType(resp.TypeUrl),
len(resp.Resources))
// Send ACK for normal XDS responses (if not already
sent by handler)
if !hasHandler {
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 9446d8aa..b3282211 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -322,6 +322,31 @@ func ShouldRespond(w Watcher, id string, request
*discovery.DiscoveryRequest) (b
removed := previousResources.Difference(cur)
added := cur.Difference(previousResources)
+ // CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty
ResourceNames) after receiving specific resources,
+ // this is likely an ACK and we should NOT push all resources again
+ // Check if this is a wildcard request after specific resources were
sent
+ if len(request.ResourceNames) == 0 && len(previousResources) > 0 &&
previousInfo.NonceSent != "" {
+ // This is a wildcard request after specific resources were sent
+ // For proxyless gRPC clients, this should be treated as ACK,
not a request for all resources
+ // The client already has the resources from the previous push
+ // Check if this is proxyless by attempting to get the proxy
from watcher
+ // For now, we'll check if this is a proxyless scenario by the
context
+ // If previous resources existed and client sends empty names
with matching nonce, it's an ACK
+ klog.V(2).Infof("ADS:%s: wildcard request after specific
resources (prev: %d resources, nonce: %s), treating as ACK",
+ stype, len(previousResources), previousInfo.NonceSent)
+ // Update ResourceNames to keep previous resources (don't clear
them)
+ w.UpdateWatchedResource(request.TypeUrl, func(wr
*WatchedResource) *WatchedResource {
+ if wr == nil {
+ return nil
+ }
+ // Keep the previous ResourceNames, don't clear them
with empty set
+ // The client is ACKing the previous push, not
requesting all resources
+ wr.ResourceNames = previousResources
+ return wr
+ })
+ return false, emptyResourceDelta
+ }
+
// We should always respond "alwaysRespond" marked requests to let
Envoy finish warming
// even though Nonce match and it looks like an ACK.
if alwaysRespond {
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index c907ea78..38e255c5 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -326,7 +326,7 @@ func ParseServiceNodeWithMetadata(nodeID string, metadata
*NodeMetadata) (*Proxy
}
if len(parts) != 4 {
- return out, fmt.Errorf("missing parts in the service node %q",
nodeID)
+ return out, fmt.Errorf("missing parts in the service node %q
(expected 4 parts, got %d)", nodeID, len(parts))
}
// Extract IP address from parts[1] (format: type~ip~id~domain)
@@ -338,13 +338,24 @@ func ParseServiceNodeWithMetadata(nodeID string, metadata
*NodeMetadata) (*Proxy
}
}
- // Does query from ingress or router have to carry valid IP address?
+ // If IP address is empty in node ID, we still need to validate it
+ // For proxyless gRPC, IP address should be set, but we'll allow it to
be set later
+ // if it's empty, we'll set it from pod IP when ServiceTargets are
computed
if len(out.IPAddresses) == 0 {
- return out, fmt.Errorf("no valid IP address in the service node
id or metadata")
+ // IP address will be set later when ServiceTargets are
computed from pod IP
+ // For now, we allow empty IP for proxyless nodes to avoid
failing initialization
+ // The IP will be populated when GetProxyServiceTargets is
called
+ out.IPAddresses = []string{}
}
out.ID = parts[2]
out.DNSDomain = parts[3]
+
+ // Validate that ID is not empty - this is critical for proxyless gRPC
+ if len(out.ID) == 0 {
+ return out, fmt.Errorf("node ID is empty in service node %q
(parts[2] is empty)", nodeID)
+ }
+
return out, nil
}
diff --git a/sail/pkg/model/endpointshards.go b/sail/pkg/model/endpointshards.go
index 81808566..1e57f0e3 100644
--- a/sail/pkg/model/endpointshards.go
+++ b/sail/pkg/model/endpointshards.go
@@ -1,9 +1,10 @@
package model
import (
- "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"sync"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
@@ -160,8 +161,32 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
oldDubboEndpoints := ep.Shards[shard]
newDubboEndpoints, needPush :=
endpointUpdateRequiresPush(oldDubboEndpoints, dubboEndpoints)
+ // CRITICAL: Log endpoint update details for debugging
+ if logPushType {
+ oldHealthyCount := 0
+ oldUnhealthyCount := 0
+ newHealthyCount := 0
+ newUnhealthyCount := 0
+ for _, ep := range oldDubboEndpoints {
+ if ep.HealthStatus == Healthy {
+ oldHealthyCount++
+ } else {
+ oldUnhealthyCount++
+ }
+ }
+ for _, ep := range newDubboEndpoints {
+ if ep.HealthStatus == Healthy {
+ newHealthyCount++
+ } else {
+ newUnhealthyCount++
+ }
+ }
+ klog.Warningf("UpdateServiceEndpoints: service=%s, shard=%v,
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d,
unhealthy=%d), needPush=%v, pushType=%v",
+ hostname, shard, len(oldDubboEndpoints),
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount,
newUnhealthyCount, needPush, pushType)
+ }
+
if pushType != FullPush && !needPush {
- klog.V(2).Infof("No push, either old endpoint health status did
not change or new endpoint came with unhealthy status, %v", hostname)
+ klog.Warningf("No push, either old endpoint health status did
not change or new endpoint came with unhealthy status, %v (oldEndpoints=%d,
newEndpoints=%d)", hostname, len(oldDubboEndpoints), len(newDubboEndpoints))
pushType = NoPush
}
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index c0527192..9a4ab916 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -155,6 +155,17 @@ func (ep *DubboEndpoint) Equals(other *DubboEndpoint) bool
{
return false
}
+ // CRITICAL FIX: Compare HealthStatus to detect health status changes
+ // This is necessary to trigger EDS push when endpoints become
healthy/unhealthy
+ if ep.HealthStatus != other.HealthStatus {
+ return false
+ }
+
+ // CRITICAL FIX: Compare EndpointPort to detect port changes
+ if ep.EndpointPort != other.EndpointPort {
+ return false
+ }
+
return true
}
@@ -470,13 +481,32 @@ func (s *ServiceAttributes) Equals(other
*ServiceAttributes) bool {
}
func (s *Service) SupportsUnhealthyEndpoints() bool {
- return false
+ // CRITICAL FIX: Return PublishNotReadyAddresses to support publishing
not-ready endpoints
+ // This allows endpoints with Ready=false to be included in EDS if the
service has
+ // publishNotReadyAddresses=true, which is useful for services that
need to receive
+ // traffic even before they are fully ready (e.g., during startup).
+ // CRITICAL FIX: Check if s is nil before accessing Attributes
+ if s == nil {
+ return false
+ }
+ return s.Attributes.PublishNotReadyAddresses
}
func (s *Service) SupportsDrainingEndpoints() bool {
return false
}
+// GetAddressForProxy returns the primary address for a service from the
proxy's perspective.
+// This is used for outbound listener addresses in gRPC proxyless mode.
+func (s *Service) GetAddressForProxy(node *Proxy) string {
+ addresses := s.getAllAddressesForProxy(node)
+ if len(addresses) > 0 {
+ return addresses[0]
+ }
+ // Default to 0.0.0.0 if no address found, which matches outbound
listener behavior
+ return "0.0.0.0"
+}
+
func (s *Service) GetExtraAddressesForProxy(node *Proxy) []string {
addresses := s.getAllAddressesForProxy(node)
if len(addresses) > 1 {
diff --git a/sail/pkg/model/typed_xds_cache.go
b/sail/pkg/model/typed_xds_cache.go
index af1aec08..883990a7 100644
--- a/sail/pkg/model/typed_xds_cache.go
+++ b/sail/pkg/model/typed_xds_cache.go
@@ -2,6 +2,9 @@ package model
import (
"fmt"
+ "sync"
+ "time"
+
"github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
@@ -9,8 +12,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/golang-lru/v2/simplelru"
"google.golang.org/protobuf/testing/protocmp"
- "sync"
- "time"
+ "k8s.io/klog/v2"
)
type CacheToken uint64
@@ -186,13 +188,20 @@ func (l *lruCache[K]) Clear(configs sets.Set[ConfigKey]) {
defer func() {
l.evictedOnClear = false
}()
+ clearedCount := 0
for ckey := range configs {
hc := ckey.HashCode()
referenced := l.configIndex[hc]
- delete(l.configIndex, hc)
- for key := range referenced {
- l.store.Remove(key)
+ if len(referenced) > 0 {
+ clearedCount += len(referenced)
+ for key := range referenced {
+ l.store.Remove(key)
+ }
}
+ delete(l.configIndex, hc)
+ }
+ if clearedCount > 0 {
+ klog.V(3).Infof("lruCache.Clear: cleared %d cache entries for
%d configs", clearedCount, len(configs))
}
}
diff --git a/sail/pkg/networking/grpcgen/cds.go
b/sail/pkg/networking/grpcgen/cds.go
index 5b085654..c5d87142 100644
--- a/sail/pkg/networking/grpcgen/cds.go
+++ b/sail/pkg/networking/grpcgen/cds.go
@@ -2,6 +2,7 @@ package grpcgen
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
@@ -98,6 +99,11 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
var defaultCluster *cluster.Cluster
if b.filter.Contains(b.defaultClusterName) {
defaultCluster = b.edsCluster(b.defaultClusterName)
+ // CRITICAL: For gRPC proxyless, we need to set CommonLbConfig
to handle endpoint health status
+ // This ensures that the cluster can use healthy endpoints for
load balancing
+ if defaultCluster.CommonLbConfig == nil {
+ defaultCluster.CommonLbConfig =
&cluster.Cluster_CommonLbConfig{}
+ }
if b.svc.SupportsDrainingEndpoints() {
// see core/v1alpha3/cluster.go
defaultCluster.CommonLbConfig.OverrideHostStatus =
&core.HealthStatusSet{
@@ -106,6 +112,13 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
core.HealthStatus_DRAINING,
core.HealthStatus_UNKNOWN, core.HealthStatus_DEGRADED,
},
}
+ } else {
+ // For gRPC proxyless, only use HEALTHY endpoints by
default
+ defaultCluster.CommonLbConfig.OverrideHostStatus =
&core.HealthStatusSet{
+ Statuses: []core.HealthStatus{
+ core.HealthStatus_HEALTHY,
+ },
+ }
}
}
@@ -130,6 +143,9 @@ func (b *clusterBuilder) edsCluster(name string)
*cluster.Cluster {
},
},
},
+ // CRITICAL: For gRPC proxyless, we need to set LbPolicy to
ROUND_ROBIN
+ // This is the default load balancing policy for gRPC xDS
clients
+ LbPolicy: cluster.Cluster_ROUND_ROBIN,
}
}
diff --git a/sail/pkg/networking/grpcgen/grpcgen.go
b/sail/pkg/networking/grpcgen/grpcgen.go
index 804ed651..20d46e1f 100644
--- a/sail/pkg/networking/grpcgen/grpcgen.go
+++ b/sail/pkg/networking/grpcgen/grpcgen.go
@@ -8,13 +8,25 @@ import (
type GrpcConfigGenerator struct{}
func (g *GrpcConfigGenerator) Generate(proxy *model.Proxy, w
*model.WatchedResource, req *model.PushRequest) (model.Resources,
model.XdsLogDetails, error) {
+ // Extract requested resource names from WatchedResource
+ // If ResourceNames is empty (wildcard request), pass empty slice
+ // BuildListeners will handle empty names as wildcard request and
generate all listeners
+ var requestedNames []string
+ if w != nil && w.ResourceNames != nil && len(w.ResourceNames) > 0 {
+ requestedNames = w.ResourceNames.UnsortedList()
+ }
+
switch w.TypeUrl {
case v3.ListenerType:
- return g.BuildListeners(proxy, req.Push,
w.ResourceNames.UnsortedList()), model.DefaultXdsLogDetails, nil
+ // Pass requested names to BuildListeners to ensure consistent
behavior
+ // This is critical for gRPC proxyless clients to avoid
resource count oscillation
+ // When requestedNames is empty (wildcard), BuildListeners
generates all listeners
+ // When requestedNames is non-empty, BuildListeners only
generates requested listeners
+ return g.BuildListeners(proxy, req.Push, requestedNames),
model.DefaultXdsLogDetails, nil
case v3.ClusterType:
- return g.BuildClusters(proxy, req.Push,
w.ResourceNames.UnsortedList()), model.DefaultXdsLogDetails, nil
+ return g.BuildClusters(proxy, req.Push, requestedNames),
model.DefaultXdsLogDetails, nil
case v3.RouteType:
- return g.BuildHTTPRoutes(proxy, req.Push,
w.ResourceNames.UnsortedList()), model.DefaultXdsLogDetails, nil
+ return g.BuildHTTPRoutes(proxy, req.Push, requestedNames),
model.DefaultXdsLogDetails, nil
}
return nil, model.DefaultXdsLogDetails, nil
diff --git a/sail/pkg/networking/grpcgen/lds.go
b/sail/pkg/networking/grpcgen/lds.go
index 810bb662..463e0a74 100644
--- a/sail/pkg/networking/grpcgen/lds.go
+++ b/sail/pkg/networking/grpcgen/lds.go
@@ -6,13 +6,18 @@ import (
"strconv"
"strings"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/pkg/wellknown"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
listener
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+ route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ routerv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
+ hcmv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"k8s.io/klog/v2"
)
@@ -70,9 +75,11 @@ func newListenerNameFilter(names []string, node
*model.Proxy) listenerNames {
}
func (g *GrpcConfigGenerator) BuildListeners(node *model.Proxy, push
*model.PushContext, names []string) model.Resources {
- // If no specific names requested, generate listeners for all
ServiceTargets
+ // For LDS (wildcard type), empty ResourceNames means request all
listeners
+ // If names is empty, generate listeners for all ServiceTargets to
ensure consistent behavior
+ // This prevents the client from receiving different numbers of
listeners on different requests
if len(names) == 0 && len(node.ServiceTargets) > 0 {
- // Build listener names from ServiceTargets for initial request
+ // Build listener names from ServiceTargets for
wildcard/initial request
names = make([]string, 0, len(node.ServiceTargets))
for _, st := range node.ServiceTargets {
if st.Service != nil && st.Port.ServicePort != nil {
@@ -81,19 +88,242 @@ func (g *GrpcConfigGenerator) BuildListeners(node
*model.Proxy, push *model.Push
names = append(names, listenerName)
}
}
+ klog.V(2).Infof("BuildListeners: wildcard request for %s,
generating %d listeners from ServiceTargets: %v", node.ID, len(names), names)
+ } else if len(names) > 0 {
+ klog.V(2).Infof("BuildListeners: specific request for %s,
requested listeners: %v", node.ID, names)
}
+ // CRITICAL: If names is provided (non-empty), we MUST only generate
those specific listeners
+ // This prevents the push loop where client requests 1 listener but
receives 14
+ // The filter ensures we only generate requested listeners
filter := newListenerNameFilter(names, node)
resp := make(model.Resources, 0, len(filter))
- resp = append(resp, buildOutboundListeners(node, push, filter)...)
+
+ // Build outbound listeners first (they may reference clusters that
need CDS)
+ outboundRes := buildOutboundListeners(node, push, filter)
+ resp = append(resp, outboundRes...)
resp = append(resp, buildInboundListeners(node, push,
filter.inboundNames())...)
+ // Final safety check: ensure we only return resources that were
requested
+ // This is critical for preventing push loops in proxyless gRPC
+ if len(names) > 0 {
+ requestedSet := sets.New(names...)
+ filtered := make(model.Resources, 0, len(resp))
+ for _, r := range resp {
+ if requestedSet.Contains(r.Name) {
+ filtered = append(filtered, r)
+ } else {
+ klog.V(2).Infof("BuildListeners: filtering out
unrequested listener %s (requested: %v)", r.Name, names)
+ }
+ }
+ return filtered
+ }
+
return resp
}
func buildOutboundListeners(node *model.Proxy, push *model.PushContext, filter
listenerNames) model.Resources {
out := make(model.Resources, 0, len(filter))
- // TODO SidecarScope?
+
+ // Extract outbound listener names (not inbound)
+ // The filter map uses hostname as key, and stores ports in
listenerName.Ports
+ // We need to reconstruct the full listener name (hostname:port) for
each port
+ type listenerInfo struct {
+ hostname string
+ ports []string
+ }
+ var outboundListeners []listenerInfo
+
+ for name, ln := range filter {
+ if strings.HasPrefix(name, grpcxds.ServerListenerNamePrefix) {
+ continue // Skip inbound listeners
+ }
+
+ // If this entry has ports, use them; otherwise use the name
as-is (might already have port)
+ if len(ln.Ports) > 0 {
+ ports := make([]string, 0, len(ln.Ports))
+ for port := range ln.Ports {
+ ports = append(ports, port)
+ }
+ outboundListeners = append(outboundListeners,
listenerInfo{
+ hostname: name,
+ ports: ports,
+ })
+ } else {
+ // No ports in filter, try to parse name as
hostname:port
+ _, _, err := net.SplitHostPort(name)
+ if err == nil {
+ // Name already contains port, use it directly
+ outboundListeners = append(outboundListeners,
listenerInfo{
+ hostname: name,
+ ports: []string{name}, // Use full
name as-is
+ })
+ } else {
+ // No port, skip (shouldn't happen for outbound
listeners)
+ klog.V(2).Infof("buildOutboundListeners:
skipping listener %s (no port information)", name)
+ }
+ }
+ }
+
+ if len(outboundListeners) == 0 {
+ return out
+ }
+
+ // Build outbound listeners for each requested service
+ for _, li := range outboundListeners {
+ // For each port, build a listener
+ for _, portInfo := range li.ports {
+ var hostStr, portStr string
+ var err error
+
+ // Check if portInfo is already "hostname:port" format
+ hostStr, portStr, err = net.SplitHostPort(portInfo)
+ if err != nil {
+ // portInfo is just the port number, use
hostname from filter key
+ hostStr = li.hostname
+ portStr = portInfo
+ }
+
+ port, err := strconv.Atoi(portStr)
+ if err != nil {
+ klog.Warningf("buildOutboundListeners: failed
to parse port from %s: %v", portStr, err)
+ continue
+ }
+
+ // Reconstruct full listener name for logging and final
check
+ fullListenerName := fmt.Sprintf("%s:%s", hostStr,
portStr)
+
+ // Find service in PushContext
+ // Try different hostname formats: FQDN, short name,
etc.
+ hostname := host.Name(hostStr)
+ svc := push.ServiceForHostname(node, hostname)
+
+ // Extract short name from FQDN for fallback lookup
+ parts := strings.Split(hostStr, ".")
+ shortName := ""
+ if len(parts) > 0 {
+ shortName = parts[0]
+ }
+
+ // If not found with FQDN, try short name (e.g.,
"consumer" from "consumer.grpc-proxyless.svc.cluster.local")
+ if svc == nil && shortName != "" {
+ svc = push.ServiceForHostname(node,
host.Name(shortName))
+ if svc != nil {
+
klog.V(2).Infof("buildOutboundListeners: found service %s using short name %s",
svc.Hostname, shortName)
+ }
+ }
+
+ // If still not found, try to find service by iterating
all services
+ if svc == nil {
+ // Try to find service by matching hostname in
all namespaces
+ allServices := push.GetAllServices()
+ for _, s := range allServices {
+ if s.Hostname == hostname || (shortName
!= "" && strings.HasPrefix(string(s.Hostname), shortName+".")) {
+ svc = s
+
klog.V(2).Infof("buildOutboundListeners: found service %s/%s by iterating",
svc.Attributes.Namespace, svc.Hostname)
+ break
+ }
+ }
+ }
+
+ if svc == nil {
+ klog.Warningf("buildOutboundListeners: service
not found for hostname %s (tried FQDN and short name)", hostStr)
+ continue
+ }
+
+ // Verify service has the requested port (Service port,
not targetPort)
+ hasPort := false
+ var matchedPort *model.Port
+ for _, p := range svc.Ports {
+ if p.Port == port {
+ hasPort = true
+ matchedPort = p
+ break
+ }
+ }
+ if !hasPort {
+ klog.Warningf("buildOutboundListeners: port %d
not found in service %s (available ports: %v)",
+ port, hostStr, func() []int {
+ ports := make([]int, 0,
len(svc.Ports))
+ for _, p := range svc.Ports {
+ ports = append(ports,
p.Port)
+ }
+ return ports
+ }())
+ continue
+ }
+
+ klog.V(2).Infof("buildOutboundListeners: building
outbound listener for %s:%d (service: %s/%s, port: %s)",
+ hostStr, port, svc.Attributes.Namespace,
svc.Attributes.Name, matchedPort.Name)
+
+ // Build cluster name using BuildSubsetKey to ensure
correct format
+ // Format: outbound|port||hostname (e.g.,
outbound|7070||consumer.grpc-proxyless.svc.cluster.local)
+ // Use svc.Hostname (FQDN) instead of
svc.Attributes.Name (short name) to match CDS expectations
+ clusterName :=
model.BuildSubsetKey(model.TrafficDirectionOutbound, "", svc.Hostname, port)
+
+ // Build route name (same format as cluster name) for
RDS
+ routeName := clusterName
+
+ // CRITICAL: For gRPC proxyless, outbound listeners
MUST use ApiListener with RDS
+ // This is the correct pattern used by Istio for gRPC
xDS clients
+ // Using FilterChain with inline RouteConfig causes the
gRPC client to remain in IDLE state
+ hcm := &hcmv3.HttpConnectionManager{
+ CodecType: hcmv3.HttpConnectionManager_AUTO,
+ StatPrefix: fmt.Sprintf("outbound_%d_%s", port,
svc.Attributes.Name),
+ RouteSpecifier:
&hcmv3.HttpConnectionManager_Rds{
+ Rds: &hcmv3.Rds{
+ ConfigSource:
&core.ConfigSource{
+ ConfigSourceSpecifier:
&core.ConfigSource_Ads{
+ Ads:
&core.AggregatedConfigSource{},
+ },
+ },
+ RouteConfigName: routeName,
+ },
+ },
+ HttpFilters: []*hcmv3.HttpFilter{
+ {
+ Name:
"envoy.filters.http.router",
+ ConfigType:
&hcmv3.HttpFilter_TypedConfig{
+ TypedConfig:
protoconv.MessageToAny(&routerv3.Router{}),
+ },
+ },
+ },
+ }
+
+ // Build outbound listener with ApiListener (Istio
pattern)
+ // CRITICAL: gRPC xDS clients expect ApiListener for
outbound, not FilterChain
+ ll := &listener.Listener{
+ Name: fullListenerName,
+ Address: &core.Address{Address:
&core.Address_SocketAddress{
+ SocketAddress: &core.SocketAddress{
+ Address:
svc.GetAddressForProxy(node), // Use service VIP
+ PortSpecifier:
&core.SocketAddress_PortValue{
+ PortValue: uint32(port),
+ },
+ },
+ }},
+ ApiListener: &listener.ApiListener{
+ ApiListener:
protoconv.MessageToAny(hcm),
+ },
+ }
+
+ // Add extra addresses if available
+ extrAddresses := svc.GetExtraAddressesForProxy(node)
+ if len(extrAddresses) > 0 {
+ ll.AdditionalAddresses =
util.BuildAdditionalAddresses(extrAddresses, uint32(port))
+ }
+
+ // CRITICAL: Log listener details for debugging gRPC
xDS client connection issues
+ klog.V(3).Infof("buildOutboundListeners: created
ApiListener name=%s, address=%s:%d, routeName=%s",
+ ll.Name, ll.Address.GetSocketAddress().Address,
ll.Address.GetSocketAddress().GetPortValue(), routeName)
+
+ out = append(out, &discovery.Resource{
+ Name: ll.Name,
+ Resource: protoconv.MessageToAny(ll),
+ })
+ }
+ }
+
return out
}
@@ -117,14 +347,13 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
}
}
- // If names are provided, use them; otherwise, generate listeners for
all ServiceTargets
+ // Use the provided names - at this point names should not be empty
+ // (empty names are handled in BuildListeners to generate all listeners)
+ // This ensures we only generate listeners that were requested,
preventing inconsistent behavior
listenerNames := names
if len(listenerNames) == 0 {
- // Generate listener names from ServiceTargets
- for port := range serviceInstancesByPort {
- listenerName := fmt.Sprintf("%s0.0.0.0:%d",
grpcxds.ServerListenerNamePrefix, port)
- listenerNames = append(listenerNames, listenerName)
- }
+ // This should not happen if BuildListeners logic is correct,
but handle gracefully
+ return out
}
for _, name := range listenerNames {
@@ -141,28 +370,54 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
}
si, ok := serviceInstancesByPort[uint32(listenPort)]
if !ok {
- // If no service target found for this port, still
create a minimal listener
- // This ensures we respond with at least one listener
for connection initialization
- klog.V(2).Infof("%s has no service instance for port
%s, creating minimal listener", node.ID, listenPortStr)
- ll := &listener.Listener{
- Name: name,
- Address: &core.Address{Address:
&core.Address_SocketAddress{
- SocketAddress: &core.SocketAddress{
- Address: listenHost,
- PortSpecifier:
&core.SocketAddress_PortValue{
- PortValue:
uint32(listenPort),
+ // If no service target found for this port, don't
create a listener
+ // This prevents creating invalid listeners that would
cause SERVING/NOT_SERVING cycles
+ // The client should only request listeners for ports
that are actually exposed by the service
+ klog.Warningf("%s has no service instance for port %s,
skipping listener %s. "+
+ "This usually means the requested port doesn't
match any service port in the pod's ServiceTargets.",
+ node.ID, listenPortStr, name)
+ continue
+ }
+
+ // For proxyless gRPC inbound listeners, we need a FilterChain
with HttpConnectionManager filter
+ // to satisfy gRPC client requirements. According to grpc-go
issue #7691 and the error
+ // "missing HttpConnectionManager filter", gRPC proxyless
clients require HttpConnectionManager
+ // in the FilterChain for inbound listeners.
+ // Use inline RouteConfig instead of RDS to avoid triggering
additional RDS requests that cause push loops
+ // For proxyless gRPC, inline configuration is preferred to
minimize round-trips
+ routeName := fmt.Sprintf("%d", listenPort)
+ hcm := &hcmv3.HttpConnectionManager{
+ CodecType: hcmv3.HttpConnectionManager_AUTO,
+ StatPrefix: fmt.Sprintf("inbound_%d", listenPort),
+ RouteSpecifier:
&hcmv3.HttpConnectionManager_RouteConfig{
+ RouteConfig: &route.RouteConfiguration{
+ Name: routeName,
+ VirtualHosts: []*route.VirtualHost{
+ {
+ Name:
"inbound|http|" + routeName,
+ Domains: []string{"*"},
+ Routes: []*route.Route{
+ {
+ Match:
&route.RouteMatch{
+
PathSpecifier: &route.RouteMatch_Prefix{
+
Prefix: "/",
+
},
+ },
+ Action:
&route.Route_NonForwardingAction{},
+ },
+ },
},
},
- }},
- FilterChains: nil,
- ListenerFilters: nil,
- UseOriginalDst: nil,
- }
- out = append(out, &discovery.Resource{
- Name: ll.Name,
- Resource: protoconv.MessageToAny(ll),
- })
- continue
+ },
+ },
+ HttpFilters: []*hcmv3.HttpFilter{
+ {
+ Name: "envoy.filters.http.router",
+ ConfigType:
&hcmv3.HttpFilter_TypedConfig{
+ TypedConfig:
protoconv.MessageToAny(&routerv3.Router{}),
+ },
+ },
+ },
}
ll := &listener.Listener{
@@ -175,7 +430,19 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
},
},
}},
- FilterChains: nil,
+ // Create FilterChain with HttpConnectionManager filter
for proxyless gRPC
+ FilterChains: []*listener.FilterChain{
+ {
+ Filters: []*listener.Filter{
+ {
+ Name:
wellknown.HTTPConnectionManager,
+ ConfigType:
&listener.Filter_TypedConfig{
+ TypedConfig:
protoconv.MessageToAny(hcm),
+ },
+ },
+ },
+ },
+ },
// the following must not be set or the client will NACK
ListenerFilters: nil,
UseOriginalDst: nil,
diff --git a/sail/pkg/networking/grpcgen/rds.go
b/sail/pkg/networking/grpcgen/rds.go
index dae825b4..4d5d770f 100644
--- a/sail/pkg/networking/grpcgen/rds.go
+++ b/sail/pkg/networking/grpcgen/rds.go
@@ -1,6 +1,10 @@
package grpcgen
import (
+ "fmt"
+ "strconv"
+ "strings"
+
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
@@ -22,19 +26,87 @@ func (g *GrpcConfigGenerator) BuildHTTPRoutes(node
*model.Proxy, push *model.Pus
}
func buildHTTPRoute(node *model.Proxy, push *model.PushContext, routeName
string) *route.RouteConfiguration {
- // TODO use route-style naming instead of cluster naming
- _, _, hostname, port := model.ParseSubsetKey(routeName)
- if hostname == "" || port == 0 {
- klog.Warningf("failed to parse %v", routeName)
- return nil
- }
+ // For proxyless gRPC inbound routes, routeName is just the port number
(e.g., "17070")
+ // For outbound routes, routeName is cluster format
(outbound|port||hostname)
+ _, err := strconv.Atoi(routeName)
+ if err != nil {
+ // Try to parse as cluster naming format
(outbound|port||hostname)
+ _, _, hostname, parsedPort := model.ParseSubsetKey(routeName)
+ if hostname == "" || parsedPort == 0 {
+ klog.Warningf("failed to parse route name %v",
routeName)
+ return nil
+ }
+
+ // Build outbound route configuration for gRPC proxyless
+ // This is used by ApiListener to route traffic to the correct
cluster
+ svc := push.ServiceForHostname(node, hostname)
+ if svc == nil {
+ klog.Warningf("buildHTTPRoute: service not found for
hostname %s", hostname)
+ return nil
+ }
- // virtualHosts, _, _ := core.BuildSidecarOutboundVirtualHosts(node,
push, routeName, port, nil, &model.DisabledCache{})
+ // Build VirtualHost Domains for outbound route
+ // CRITICAL: Domains must match the xDS URL hostname for gRPC
xDS client
+ hostStr := string(hostname)
+ domains := []string{
+ fmt.Sprintf("%s:%d", hostStr, parsedPort), // FQDN with
port - MOST SPECIFIC
+ hostStr, // Full FQDN
+ }
+ // Add short name if different from FQDN
+ hostParts := strings.Split(hostStr, ".")
+ if len(hostParts) > 0 && hostParts[0] != hostStr {
+ shortName := hostParts[0]
+ domains = append(domains, fmt.Sprintf("%s:%d",
shortName, parsedPort)) // Short name with port
+ domains = append(domains, shortName)
// Short name
+ }
+ domains = append(domains, "*") // Wildcard for any domain -
LEAST SPECIFIC
+
+ return &route.RouteConfiguration{
+ Name: routeName,
+ VirtualHosts: []*route.VirtualHost{
+ {
+ Name: fmt.Sprintf("%s|http|%d",
hostStr, parsedPort),
+ Domains: domains,
+ Routes: []*route.Route{
+ {
+ Match:
&route.RouteMatch{
+ PathSpecifier:
&route.RouteMatch_Prefix{
+ Prefix:
"/",
+ },
+ },
+ Action:
&route.Route_Route{
+ Route:
&route.RouteAction{
+
ClusterSpecifier: &route.RouteAction_Cluster{
+
Cluster: routeName, // Use routeName (cluster name)
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ }
- // Only generate the required route for grpc. Will need to generate more
- // as GRPC adds more features.
+ // Build minimal route configuration for proxyless gRPC inbound listener
+ // NonForwardingAction indicates this is an inbound listener that
should handle requests directly
return &route.RouteConfiguration{
- Name: routeName,
- VirtualHosts: nil,
+ Name: routeName,
+ VirtualHosts: []*route.VirtualHost{
+ {
+ Name: "inbound|http|" + routeName,
+ Domains: []string{"*"},
+ Routes: []*route.Route{
+ {
+ Match: &route.RouteMatch{
+ PathSpecifier:
&route.RouteMatch_Prefix{
+ Prefix: "/",
+ },
+ },
+ Action:
&route.Route_NonForwardingAction{},
+ },
+ },
+ },
+ },
}
}
diff --git a/sail/pkg/networking/util/util.go b/sail/pkg/networking/util/util.go
index 3da87664..d2061faa 100644
--- a/sail/pkg/networking/util/util.go
+++ b/sail/pkg/networking/util/util.go
@@ -2,12 +2,13 @@ package util
import (
"fmt"
+ "strings"
+
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
dubbonetworking "github.com/apache/dubbo-kubernetes/sail/pkg/networking"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
listener
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
- "strings"
)
const (
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go
b/sail/pkg/serviceregistry/kube/controller/controller.go
index b318ebdb..1cece128 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -19,6 +19,7 @@ package controller
import (
"sort"
+ "strings"
"sync"
"time"
@@ -48,6 +49,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
)
@@ -201,6 +203,41 @@ func (c *Controller) GetProxyServiceTargets(proxy
*model.Proxy) []model.ServiceT
proxyNamespace = proxy.Metadata.Namespace
}
+ // Get pod by proxy IP address to check if service selector matches pod
labels
+ // Also get pod to resolve targetPort from container ports
+ // If IP is empty, try to find pod by node ID (format:
podname.namespace)
+ var podLabels labels.Instance
+ var pod *v1.Pod
+ if len(proxy.IPAddresses) > 0 {
+ pods := c.pods.getPodsByIP(proxy.IPAddresses[0])
+ if len(pods) > 0 {
+ // Use the first pod's labels (in most cases there
should be only one pod per IP)
+ pod = pods[0]
+ podLabels = labels.Instance(pod.Labels)
+ }
+ } else if proxy.ID != "" {
+ // If IP address is empty, try to find pod by node ID
+ // Node ID format for proxyless is: podname.namespace
+ parts := strings.Split(proxy.ID, ".")
+ if len(parts) >= 2 {
+ podName := parts[0]
+ podNamespace := parts[1]
+ // Try to get pod by name and namespace
+ podKey := types.NamespacedName{Name: podName,
Namespace: podNamespace}
+ pod = c.pods.getPodByKey(podKey)
+ if pod != nil {
+ // Extract IP from pod and set it to proxy
+ if pod.Status.PodIP != "" {
+ proxy.IPAddresses =
[]string{pod.Status.PodIP}
+
klog.V(2).Infof("GetProxyServiceTargets: set proxy IP from pod %s/%s: %s",
podNamespace, podName, pod.Status.PodIP)
+ }
+ podLabels = labels.Instance(pod.Labels)
+ } else {
+ klog.V(2).Infof("GetProxyServiceTargets: pod
%s/%s not found by node ID", podNamespace, podName)
+ }
+ }
+ }
+
out := make([]model.ServiceTarget, 0)
c.RLock()
defer c.RUnlock()
@@ -236,16 +273,65 @@ func (c *Controller) GetProxyServiceTargets(proxy
*model.Proxy) []model.ServiceT
continue
}
+ // For services in the same namespace, check if the service
selector matches pod labels
+ // This ensures we only return services that actually select
this pod
+ if svc.Attributes.Namespace == proxyNamespace && podLabels !=
nil {
+ serviceSelector :=
labels.Instance(svc.Attributes.LabelSelectors)
+ if len(serviceSelector) > 0 {
+ // If service has a selector, check if it
matches pod labels
+ if !serviceSelector.Match(podLabels) {
+ // Service selector doesn't match pod
labels, skip this service
+ continue
+ }
+ }
+ }
+
+ // Get the original Kubernetes Service to resolve targetPort
+ var kubeSvc *v1.Service
+ if c.services != nil {
+ kubeSvc = c.services.Get(svc.Attributes.Name,
svc.Attributes.Namespace)
+ }
+
// Create a ServiceTarget for each port
for _, port := range svc.Ports {
if port == nil {
continue
}
+ targetPort := uint32(port.Port) // Default to service
port
+
+ // Try to resolve actual targetPort from Kubernetes
Service and Pod
+ if kubeSvc != nil {
+ // Find the matching ServicePort in Kubernetes
Service
+ for _, kubePort := range kubeSvc.Spec.Ports {
+ if kubePort.Name == port.Name &&
int32(kubePort.Port) == int32(port.Port) {
+ // Resolve targetPort from
ServicePort.TargetPort
+ if kubePort.TargetPort.Type ==
intstr.Int {
+ // TargetPort is a
number
+ targetPort =
uint32(kubePort.TargetPort.IntVal)
+ } else if
kubePort.TargetPort.Type == intstr.String && pod != nil {
+ // TargetPort is a
string (port name), resolve from Pod container ports
+ for _, container :=
range pod.Spec.Containers {
+ for _,
containerPort := range container.Ports {
+ if
containerPort.Name == kubePort.TargetPort.StrVal {
+
targetPort = uint32(containerPort.ContainerPort)
+
break
+ }
+ }
+ if targetPort
!= uint32(port.Port) {
+ break
+ }
+ }
+ }
+ break
+ }
+ }
+ }
+
target := model.ServiceTarget{
Service: svc,
Port: model.ServiceInstancePort{
ServicePort: port,
- TargetPort: uint32(port.Port),
+ TargetPort: targetPort,
},
}
out = append(out, target)
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
index 0af5fa38..4c381a7b 100644
--- a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -1,6 +1,10 @@
package controller
import (
+ "fmt"
+ "strings"
+ "sync"
+
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
@@ -16,9 +20,9 @@ import (
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/klog/v2"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
- "strings"
- "sync"
)
var (
@@ -180,6 +184,21 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
// Draining tracking is only enabled if persistent sessions is
enabled.
// If we start using them for other features, this can be
adjusted.
healthStatus := endpointHealthStatus(svc, e)
+
+ // CRITICAL: Log health status for debugging (use Warningf to
ensure visibility)
+ if len(e.Addresses) > 0 {
+ ready := "nil"
+ terminating := "nil"
+ if e.Conditions.Ready != nil {
+ ready = fmt.Sprintf("%v", *e.Conditions.Ready)
+ }
+ if e.Conditions.Terminating != nil {
+ terminating = fmt.Sprintf("%v",
*e.Conditions.Terminating)
+ }
+ klog.Warningf("endpointHealthStatus: address=%s,
Ready=%s, Terminating=%s, HealthStatus=%v, svc=%v",
+ e.Addresses[0], ready, terminating,
healthStatus, svc != nil)
+ }
+
for _, a := range e.Addresses {
pod, expectedPod := getPod(esc.c, a,
&metav1.ObjectMeta{Name: epSlice.Name, Namespace: epSlice.Namespace},
e.TargetRef, hostName)
if pod == nil && expectedPod {
@@ -189,17 +208,162 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
var overrideAddresses []string
builder := esc.c.NewEndpointBuilder(pod)
// EDS and ServiceEntry use name for service port - ADS
will need to map to numbers.
+ // CRITICAL FIX: Always use Service.Port.Name as source
of truth for ServicePortName
+ // - Use EndpointSlice.Port.Port (service port number)
as endpointPort
+ // - Always resolve portName from Service by matching
port number (Service is source of truth)
+ // - This ensures ep.ServicePortName matches
svcPort.Name in BuildClusterLoadAssignment
+ kubeSvc := esc.c.services.Get(svcNamespacedName.Name,
svcNamespacedName.Namespace)
for _, port := range epSlice.Ports {
- var portNum int32
+ var epSlicePortNum int32
if port.Port != nil {
- portNum = *port.Port
+ epSlicePortNum = *port.Port
}
- var portName string
+ var epSlicePortName string
if port.Name != nil {
- portName = *port.Name
+ epSlicePortName = *port.Name
+ }
+
+ var servicePortNum int32
+ var targetPortNum int32
+ var portName string
+
+ // CRITICAL FIX: EndpointSlice.Port.Port might
be targetPort or service port
+ // We need to find the matching ServicePort and
resolve:
+ // 1. servicePortNum (Service.Port) - used for
matching in BuildClusterLoadAssignment
+ // 2. targetPortNum (Service.TargetPort) - used
as EndpointPort
+ // 3. portName (Service.Port.Name) - used as
ServicePortName for filtering
+ if kubeSvc != nil {
+ matched := false
+ for _, kubePort := range
kubeSvc.Spec.Ports {
+ // Try matching by port number
(service port or targetPort)
+ portMatches :=
int32(kubePort.Port) == epSlicePortNum
+
+ // Also try matching by
targetPort
+ var kubeTargetPort int32
+ if kubePort.TargetPort.Type ==
intstr.Int {
+ kubeTargetPort =
kubePort.TargetPort.IntVal
+ } else if
kubePort.TargetPort.Type == intstr.String && pod != nil {
+ // Resolve targetPort
name from Pod
+ for _, container :=
range pod.Spec.Containers {
+ for _,
containerPort := range container.Ports {
+ if
containerPort.Name == kubePort.TargetPort.StrVal {
+
kubeTargetPort = containerPort.ContainerPort
+
break
+ }
+ }
+ if
kubeTargetPort != 0 {
+ break
+ }
+ }
+ }
+ targetPortMatches :=
kubeTargetPort == epSlicePortNum
+
+ // Match by port name if
available
+ nameMatches := epSlicePortName
== "" || kubePort.Name == epSlicePortName
+
+ if (portMatches ||
targetPortMatches) && (epSlicePortName == "" || nameMatches) {
+ // Found matching
ServicePort
+ servicePortNum =
int32(kubePort.Port)
+ portName = kubePort.Name
+
+ // Resolve targetPortNum
+ if
kubePort.TargetPort.Type == intstr.Int {
+ targetPortNum =
kubePort.TargetPort.IntVal
+ } else if
kubePort.TargetPort.Type == intstr.String && pod != nil {
+ // Resolve
targetPort name from Pod container ports
+ for _,
container := range pod.Spec.Containers {
+ for _,
containerPort := range container.Ports {
+
if containerPort.Name == kubePort.TargetPort.StrVal {
+
targetPortNum = containerPort.ContainerPort
+
break
+
}
+ }
+ if
targetPortNum != 0 {
+
break
+ }
+ }
+ } else {
+ // If
targetPort is string but pod not found, use service port as fallback
+ targetPortNum =
servicePortNum
+ }
+
+ // If targetPortNum is
still 0, use service port
+ if targetPortNum == 0 {
+ targetPortNum =
servicePortNum
+ }
+
+ matched = true
+
klog.V(2).InfoS("updateEndpointCacheForSlice: matched ServicePort
(servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port
(portNum=%d, portName='%s')",
+ servicePortNum,
targetPortNum, portName, epSlicePortNum, epSlicePortName)
+ break
+ }
+ }
+
+ if !matched {
+
klog.V(2).InfoS("updateEndpointCacheForSlice: failed to match
EndpointSlice.Port (portNum=%d, portName='%s') with Service %s, using
EndpointSlice values",
+ epSlicePortNum,
epSlicePortName, svcNamespacedName.Name)
+ // Fallback: use EndpointSlice
values
+ servicePortNum = epSlicePortNum
+ targetPortNum = epSlicePortNum
+ if epSlicePortName != "" {
+ portName =
epSlicePortName
+ }
+ }
+ } else {
+ // Service not found, use EndpointSlice
values
+ servicePortNum = epSlicePortNum
+ targetPortNum = epSlicePortNum
+ if epSlicePortName != "" {
+ portName = epSlicePortName
+ }
+
klog.V(2).InfoS("updateEndpointCacheForSlice: Service not found for %s, using
EndpointSlice values (portNum=%d, portName='%s')",
+ svcNamespacedName.Name,
epSlicePortNum, epSlicePortName)
}
- dubboEndpoint := builder.buildDubboEndpoint(a,
portNum, portName, nil, healthStatus, svc.SupportsUnhealthyEndpoints())
+ // CRITICAL: Log endpoint creation with actual
values for debugging
+ klog.V(2).InfoS("updateEndpointCacheForSlice:
creating endpoint for service %s (address=%s, servicePortNum=%d,
targetPortNum=%d, portName='%s', hostname=%s, kubeSvc=%v)",
+ svcNamespacedName.Name, a,
servicePortNum, targetPortNum, portName, hostName, kubeSvc != nil)
+
+ // CRITICAL FIX: According to Istio's
implementation and Kubernetes EndpointSlice spec:
+ // - EndpointSlice.Port.Port should be the
Service Port (not targetPort)
+ // - But IstioEndpoint.EndpointPort should be
the targetPort (container port)
+ // - ServicePortName should match
Service.Port.Name for filtering in BuildClusterLoadAssignment
+ //
+ // We use targetPortNum as EndpointPort because
that's what the container actually listens on.
+ // The servicePortNum is used for matching in
BuildClusterLoadAssignment via portName.
+ //
+ // CRITICAL: svc.SupportsUnhealthyEndpoints()
returns true if Service has publishNotReadyAddresses=true
+ // This allows endpoints with Ready=false to be
included in EDS, which is useful for services
+ // that need to receive traffic even before
they are fully ready (e.g., during startup).
+ // CRITICAL FIX: Check if svc is nil before
calling SupportsUnhealthyEndpoints
+ var supportsUnhealthy bool
+ if svc != nil {
+ supportsUnhealthy =
svc.SupportsUnhealthyEndpoints()
+ } else {
+ // If service is nil, default to false
(don't support unhealthy endpoints)
+ supportsUnhealthy = false
+ }
+ dubboEndpoint := builder.buildDubboEndpoint(a,
targetPortNum, portName, nil, healthStatus, supportsUnhealthy)
+
+ // CRITICAL: Log if endpoint is unhealthy and
service doesn't support it
+ if healthStatus == model.UnHealthy &&
!supportsUnhealthy {
+ if svc != nil {
+
klog.V(2).InfoS("updateEndpointCacheForSlice: endpoint %s is unhealthy
(HealthStatus=%v) but service %s does not support unhealthy endpoints
(PublishNotReadyAddresses=%v). Endpoint will be filtered in EDS.",
+ a, healthStatus,
svcNamespacedName.Name, svc.Attributes.PublishNotReadyAddresses)
+ } else {
+
klog.V(2).InfoS("updateEndpointCacheForSlice: endpoint %s is unhealthy
(HealthStatus=%v) but service %s is nil. Endpoint will be filtered in EDS.",
+ a, healthStatus,
svcNamespacedName.Name)
+ }
+ }
+
+ // CRITICAL: Verify the endpoint was created
with correct ServicePortName
+ if dubboEndpoint != nil {
+
klog.V(2).InfoS("updateEndpointCacheForSlice: created endpoint with
ServicePortName='%s', EndpointPort=%d, address=%s",
+ dubboEndpoint.ServicePortName,
dubboEndpoint.EndpointPort, dubboEndpoint.FirstAddressOrNil())
+ } else {
+
klog.Errorf("updateEndpointCacheForSlice: buildDubboEndpoint returned nil for
address=%s, targetPortNum=%d, portName='%s'",
+ a, targetPortNum, portName)
+ }
if len(overrideAddresses) > 1 {
dubboEndpoint.Addresses =
overrideAddresses
}
@@ -233,16 +397,25 @@ func (e *endpointSliceCache) update(hostname host.Name,
slice string, endpoints
}
func endpointHealthStatus(svc *model.Service, e v1.Endpoint)
model.HealthStatus {
+ // CRITICAL FIX: Correct health status logic
+ // 1. If Ready is nil or true, endpoint is healthy
+ // 2. If Ready is false, check if it's terminating
+ // 3. If terminating, mark as Terminating
+ // 4. Otherwise, mark as UnHealthy
+
+ // Check Ready condition
if e.Conditions.Ready == nil || *e.Conditions.Ready {
return model.Healthy
}
- // If it is shutting down, mark it as terminating. This occurs
regardless of whether it was previously healthy or not.
- if svc != nil &&
- (e.Conditions.Terminating == nil || *e.Conditions.Terminating) {
+ // Ready is false, check if it's terminating
+ // CRITICAL FIX: Terminating should be checked only if it's not nil AND
true
+ // If Terminating is nil, it means the endpoint is not terminating
(it's just not ready)
+ if e.Conditions.Terminating != nil && *e.Conditions.Terminating {
return model.Terminating
}
+ // Ready is false and not terminating, mark as unhealthy
return model.UnHealthy
}
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index e0839467..0d566fc3 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -18,6 +18,7 @@
package xds
import (
+ "fmt"
"strconv"
"strings"
"sync/atomic"
@@ -247,7 +248,10 @@ func (s *DiscoveryServer) Stream(stream DiscoveryStream)
error {
peerAddr = peerInfo.Addr.String()
}
- // TODO WaitForRequestLimit?
+ if err := s.WaitForRequestLimit(stream.Context()); err != nil {
+ klog.Warningf("ADS: %q exceeded rate limit: %v", peerAddr, err)
+ return status.Errorf(codes.ResourceExhausted, "request rate
limit exceeded: %v", err)
+ }
ids, err := s.authenticate(ctx)
if err != nil {
@@ -313,8 +317,13 @@ func (s *DiscoveryServer) pushConnection(con *Connection,
pushEv *Event) error {
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con
*Connection) error {
stype := v3.GetShortType(req.TypeUrl)
- klog.V(2).Infof("ADS:%s: REQ %s resources:%d nonce:%s ", stype,
- con.ID(), len(req.ResourceNames), req.ResponseNonce)
+ // Log requested resource names
+ resourceNamesStr := ""
+ if len(req.ResourceNames) > 0 {
+ resourceNamesStr = fmt.Sprintf(" [%s]",
strings.Join(req.ResourceNames, ", "))
+ }
+ klog.Infof("ADS:%s: REQ %s resources:%d nonce:%s%s", stype,
+ con.ID(), len(req.ResourceNames), req.ResponseNonce,
resourceNamesStr)
if req.TypeUrl == v3.HealthInfoType {
return nil
}
@@ -330,6 +339,20 @@ func (s *DiscoveryServer) processRequest(req
*discovery.DiscoveryRequest, con *C
return nil
}
+ // CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty
ResourceNames) after receiving specific resources,
+ // this is likely an ACK and we should NOT push all resources again
+ // Check if this is a wildcard request after specific resources were
sent
+ watchedResource := con.proxy.GetWatchedResource(req.TypeUrl)
+ if con.proxy.IsProxylessGrpc() && len(req.ResourceNames) == 0 &&
watchedResource != nil && len(watchedResource.ResourceNames) > 0 &&
watchedResource.NonceSent != "" {
+ // This is a wildcard ACK after specific resources were sent
+ // ShouldRespond should have returned false, but we check here
as safety net
+ // Update the WatchedResource to reflect the ACK, but don't push
+ klog.V(2).Infof("%s: proxyless gRPC wildcard ACK after specific
resources (prev: %d resources, nonce: %s), skipping push",
+ stype, len(watchedResource.ResourceNames),
watchedResource.NonceSent)
+ // ShouldRespond should have already handled this, but we
ensure no push happens
+ return nil
+ }
+
// Log PUSH request BEFORE calling pushXds
if len(req.ResourceNames) > 0 {
klog.Infof("%s: PUSH request for node:%s resources:%d", stype,
con.ID(), len(req.ResourceNames))
@@ -337,15 +360,30 @@ func (s *DiscoveryServer) processRequest(req
*discovery.DiscoveryRequest, con *C
klog.Infof("%s: PUSH request for node:%s", stype, con.ID())
}
+ // Don't set Forced=true for regular proxy requests to avoid
unnecessary ServiceTargets recomputation
+ // Only set Forced for debug requests or when explicitly needed
+ // This prevents ServiceTargets from being recomputed on every request,
which can cause
+ // inconsistent listener generation and push loops
request := &model.PushRequest{
Full: true,
Push: con.proxy.LastPushContext,
Reason: model.NewReasonStats(model.ProxyRequest),
Start: con.proxy.LastPushTime,
Delta: delta,
- Forced: true,
+ Forced: false, // Only recompute ServiceTargets when
ConfigsUpdated indicates service changes
+ }
+
+ // CRITICAL FIX: Get WatchedResource after ShouldRespond has created it
+ // ShouldRespond may have created a new WatchedResource for first-time
requests
+ w := con.proxy.GetWatchedResource(req.TypeUrl)
+ if w == nil {
+ // This should not happen if ShouldRespond returned true, but
handle it gracefully
+ klog.Warningf("%s: WatchedResource is nil for %s after
ShouldRespond returned true, creating it", stype, con.ID())
+ con.proxy.NewWatchedResource(req.TypeUrl, req.ResourceNames)
+ w = con.proxy.GetWatchedResource(req.TypeUrl)
}
- return s.pushXds(con, con.proxy.GetWatchedResource(req.TypeUrl),
request)
+
+ return s.pushXds(con, w, request)
}
func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 6517c644..a77cf866 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -243,8 +243,17 @@ func (s *DiscoveryServer) dropCacheForRequest(req
*model.PushRequest) {
// If we don't know what updated, cannot safely cache. Clear the whole
cache
if req.Forced {
s.Cache.ClearAll()
+ klog.V(2).Infof("dropCacheForRequest: cleared all cache
(Forced=true)")
} else {
// Otherwise, just clear the updated configs
+ // CRITICAL: Log cache clear for debugging
+ if len(req.ConfigsUpdated) > 0 {
+ configs := make([]string, 0, len(req.ConfigsUpdated))
+ for ckey := range req.ConfigsUpdated {
+ configs = append(configs, ckey.String())
+ }
+ klog.V(3).Infof("dropCacheForRequest: clearing cache
for configs: %v", configs)
+ }
s.Cache.Clear(req.ConfigsUpdated)
}
}
diff --git a/sail/pkg/xds/eds.go b/sail/pkg/xds/eds.go
index cbed1d15..f33f1d4c 100644
--- a/sail/pkg/xds/eds.go
+++ b/sail/pkg/xds/eds.go
@@ -8,7 +8,9 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
"github.com/apache/dubbo-kubernetes/sail/pkg/xds/endpoints"
+ endpoint
"github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "k8s.io/klog/v2"
)
var _ model.XdsDeltaResourceGenerator = &EdsGenerator{}
@@ -105,17 +107,45 @@ func (eds *EdsGenerator) buildEndpoints(proxy
*model.Proxy,
// see
https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#grouping-resources-into-responses
if !req.Full || canSendPartialFullPushes(req) {
edsUpdatedServices =
model.ConfigNamesOfKind(req.ConfigsUpdated, kind.ServiceEntry)
+ if len(edsUpdatedServices) > 0 {
+ klog.V(3).Infof("buildEndpoints: edsUpdatedServices
count=%d (Full=%v)", len(edsUpdatedServices), req.Full)
+ }
}
var resources model.Resources
empty := 0
cached := 0
regenerated := 0
for clusterName := range w.ResourceNames {
+ hostname := model.ParseSubsetKeyHostname(clusterName)
+ // CRITICAL FIX: Always check if this service was updated, even
if edsUpdatedServices is nil
+ // For incremental pushes, we need to check ConfigsUpdated
directly to ensure cache is cleared
+ serviceWasUpdated := false
+ if req.ConfigsUpdated != nil {
+ // CRITICAL: Log all ConfigsUpdated entries for
debugging hostname matching
+ configKeys := make([]string, 0, len(req.ConfigsUpdated))
+ for ckey := range req.ConfigsUpdated {
+ configKeys = append(configKeys,
fmt.Sprintf("%s/%s/%s", ckey.Kind, ckey.Namespace, ckey.Name))
+ if ckey.Kind == kind.ServiceEntry {
+ // CRITICAL: Match ConfigsUpdated.Name
with hostname
+ // Both should be FQDN (e.g.,
"consumer.grpc-proxyless.svc.cluster.local")
+ if ckey.Name == hostname {
+ serviceWasUpdated = true
+
klog.V(2).Infof("buildEndpoints: service %s was updated, forcing regeneration",
hostname)
+ break
+ }
+ }
+ }
+ }
+
if edsUpdatedServices != nil {
- if _, ok :=
edsUpdatedServices[model.ParseSubsetKeyHostname(clusterName)]; !ok {
+ if _, ok := edsUpdatedServices[hostname]; !ok {
// Cluster was not updated, skip recomputing.
This happens when we get an incremental update for a
// specific Hostname. On connect or for full
push edsUpdatedServices will be empty.
- continue
+ if !serviceWasUpdated {
+ continue
+ }
+ } else {
+ serviceWasUpdated = true
}
}
builder := endpoints.NewEndpointBuilder(clusterName, proxy,
req.Push)
@@ -123,8 +153,25 @@ func (eds *EdsGenerator) buildEndpoints(proxy *model.Proxy,
continue
}
- // Try to get from cache
- if eds.Cache != nil {
+ // CRITICAL FIX: For incremental pushes (Full=false), we must
always regenerate EDS
+ // if ConfigsUpdated contains this service, because endpoint
health status may have changed.
+ // The cache may contain stale empty endpoints from when the
endpoint was unhealthy.
+ // For full pushes, we can use cache if the service was not
updated.
+ // CRITICAL: If this is an incremental push and ConfigsUpdated
contains any ServiceEntry,
+ // we must check if it matches this hostname. If it does, force
regeneration.
+ shouldRegenerate := serviceWasUpdated
+ if !shouldRegenerate && !req.Full && req.ConfigsUpdated != nil {
+ // For incremental pushes, check if any ServiceEntry in
ConfigsUpdated matches this hostname
+ for ckey := range req.ConfigsUpdated {
+ if ckey.Kind == kind.ServiceEntry && ckey.Name
== hostname {
+ shouldRegenerate = true
+ break
+ }
+ }
+ }
+
+ // Try to get from cache only if we don't need to regenerate
+ if !shouldRegenerate && eds.Cache != nil {
cachedEndpoint := eds.Cache.Get(builder)
if cachedEndpoint != nil {
resources = append(resources, cachedEndpoint)
@@ -135,8 +182,17 @@ func (eds *EdsGenerator) buildEndpoints(proxy *model.Proxy,
// generate eds from beginning
l := builder.BuildClusterLoadAssignment(eds.EndpointIndex)
+ // CRITICAL FIX: Even if endpoints are empty (l == nil), we
should still create an empty ClusterLoadAssignment
+ // to ensure the client receives the EDS response. This is
necessary for proxyless gRPC clients
+ // to know the endpoint state, even if it's empty initially.
+ // Note: BuildClusterLoadAssignment returns nil when no
endpoints are found,
+ // but we still need to send an empty ClusterLoadAssignment to
the client.
if l == nil {
- continue
+ // Build an empty ClusterLoadAssignment for this cluster
+ // Use the same logic as
endpoint_builder.go:buildEmptyClusterLoadAssignment
+ l = &endpoint.ClusterLoadAssignment{
+ ClusterName: clusterName,
+ }
}
regenerated++
diff --git a/sail/pkg/xds/endpoints/endpoint_builder.go
b/sail/pkg/xds/endpoints/endpoint_builder.go
index 4635dcff..aa5a870d 100644
--- a/sail/pkg/xds/endpoints/endpoint_builder.go
+++ b/sail/pkg/xds/endpoints/endpoint_builder.go
@@ -20,11 +20,14 @@ package endpoints
import (
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
+ "github.com/cespare/xxhash/v2"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint
"github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"google.golang.org/protobuf/types/known/wrapperspb"
+ "k8s.io/klog/v2"
)
var _ model.XdsCacheEntry = &EndpointBuilder{}
@@ -80,9 +83,22 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
// Get endpoints from the endpoint index
shards, ok := endpointIndex.ShardsForService(string(b.hostname),
b.service.Attributes.Namespace)
if !ok {
+ klog.Warningf("BuildClusterLoadAssignment: no shards found for
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s',
svcPort.Port=%d)",
+ b.hostname, b.service.Attributes.Namespace,
b.clusterName, b.port, svcPort.Name, svcPort.Port)
return buildEmptyClusterLoadAssignment(b.clusterName)
}
+ // CRITICAL: Log shards info before processing
+ shards.RLock()
+ shardCount := len(shards.Shards)
+ totalEndpointsInShards := 0
+ for _, eps := range shards.Shards {
+ totalEndpointsInShards += len(eps)
+ }
+ shards.RUnlock()
+ klog.V(3).Infof("BuildClusterLoadAssignment: found shards for service
%s (cluster=%s, port=%d, shardCount=%d, totalEndpointsInShards=%d)",
+ b.hostname, b.clusterName, b.port, shardCount,
totalEndpointsInShards)
+
// Build port map for filtering
portMap := map[string]int{}
for _, port := range b.service.Ports {
@@ -99,10 +115,17 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
// Filter and convert endpoints
var lbEndpoints []*endpoint.LbEndpoint
+ var filteredCount int
+ var totalEndpoints int
+
for _, eps := range shards.Shards {
for _, ep := range eps {
+ totalEndpoints++
// Filter by port name
if ep.ServicePortName != svcPort.Name {
+ filteredCount++
+ klog.V(3).Infof("BuildClusterLoadAssignment:
filtering out endpoint %s (port name mismatch: '%s' != '%s')",
+ ep.FirstAddressOrNil(),
ep.ServicePortName, svcPort.Name)
continue
}
@@ -112,19 +135,33 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
}
// Filter out unhealthy endpoints unless service
supports them
+ // CRITICAL: Even if endpoint is unhealthy, we should
include it if:
+ // 1. Service has publishNotReadyAddresses=true
(SupportsUnhealthyEndpoints returns true)
+ // 2. Or if the endpoint is actually healthy
(HealthStatus=Healthy)
+ //
+ // For gRPC proxyless, we may want to include endpoints
even if they're not ready initially,
+ // as the client will handle connection failures.
However, this is controlled by the service
+ // configuration (publishNotReadyAddresses).
if !b.service.SupportsUnhealthyEndpoints() &&
ep.HealthStatus == model.UnHealthy {
+ klog.V(3).Infof("BuildClusterLoadAssignment:
filtering out unhealthy endpoint %s", ep.FirstAddressOrNil())
+ filteredCount++
continue
}
// Build LbEndpoint
lbEp := b.buildLbEndpoint(ep)
- if lbEp != nil {
- lbEndpoints = append(lbEndpoints, lbEp)
+ if lbEp == nil {
+ klog.V(3).Infof("BuildClusterLoadAssignment:
buildLbEndpoint returned nil for endpoint %s", ep.FirstAddressOrNil())
+ filteredCount++
+ continue
}
+ lbEndpoints = append(lbEndpoints, lbEp)
}
}
if len(lbEndpoints) == 0 {
+ klog.V(2).Infof("BuildClusterLoadAssignment: no endpoints found
for cluster %s (hostname=%s, port=%d, totalEndpoints=%d, filteredCount=%d)",
+ b.clusterName, b.hostname, b.port, totalEndpoints,
filteredCount)
return buildEmptyClusterLoadAssignment(b.clusterName)
}
@@ -216,18 +253,28 @@ func (b *EndpointBuilder) Cacheable() bool {
// Key implements model.XdsCacheEntry
func (b *EndpointBuilder) Key() any {
- // Simple key based on cluster name for now
- // TODO: implement proper hashing if needed for cache optimization
- return b.clusterName
+ // CRITICAL FIX: EDS cache expects uint64 key, not string
+ // Hash the cluster name to uint64 to match the cache type
+ return xxhash.Sum64String(b.clusterName)
}
// Type implements model.XdsCacheEntry
func (b *EndpointBuilder) Type() string {
- return "EDS"
+ return "eds" // Must match model.EDSType constant ("eds", not "EDS")
}
// DependentConfigs implements model.XdsCacheEntry
func (b *EndpointBuilder) DependentConfigs() []model.ConfigHash {
- // Return empty slice for now - can be enhanced to return ServiceEntry
config hash if needed
- return nil
+ // CRITICAL FIX: Return ServiceEntry ConfigHash so that EDS cache can
be properly cleared
+ // when endpoints are updated. Without this, cache.Clear() cannot find
and remove stale EDS entries.
+ if b.service == nil {
+ return nil
+ }
+ // Create ConfigKey for ServiceEntry and return its hash
+ configKey := model.ConfigKey{
+ Kind: kind.ServiceEntry,
+ Name: string(b.hostname),
+ Namespace: b.service.Attributes.Namespace,
+ }
+ return []model.ConfigHash{configKey.HashCode()}
}
diff --git a/sail/pkg/xds/lds.go b/sail/pkg/xds/lds.go
index e404dbb6..4d434bb2 100644
--- a/sail/pkg/xds/lds.go
+++ b/sail/pkg/xds/lds.go
@@ -24,13 +24,21 @@ func ldsNeedsPush(proxy *model.Proxy, req
*model.PushRequest) bool {
return false
}
-func (l LdsGenerator) Generate(proxy *model.Proxy, _ *model.WatchedResource,
req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
+func (l LdsGenerator) Generate(proxy *model.Proxy, w *model.WatchedResource,
req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
if !ldsNeedsPush(proxy, req) {
return nil, model.DefaultXdsLogDetails, nil
}
+ // For standard ConfigGenerator (Envoy), BuildListeners doesn't take
names parameter
+ // The names filtering is handled by the resource filtering logic in
pushXds
listeners := l.ConfigGenerator.BuildListeners(proxy, req.Push)
resources := model.Resources{}
for _, c := range listeners {
+ // Filter resources based on WatchedResource.ResourceNames if
specified
+ if w != nil && w.ResourceNames != nil && len(w.ResourceNames) >
0 {
+ if !w.ResourceNames.Contains(c.Name) {
+ continue
+ }
+ }
resources = append(resources, &discovery.Resource{
Name: c.Name,
Resource: protoconv.MessageToAny(c),
diff --git a/sail/pkg/xds/xdsgen.go b/sail/pkg/xds/xdsgen.go
index e78afe5d..611d8729 100644
--- a/sail/pkg/xds/xdsgen.go
+++ b/sail/pkg/xds/xdsgen.go
@@ -6,13 +6,16 @@ import (
"strconv"
"strings"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/env"
"github.com/apache/dubbo-kubernetes/pkg/lazy"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
dubboversion "github.com/apache/dubbo-kubernetes/pkg/version"
"github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+ cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"k8s.io/klog/v2"
@@ -64,6 +67,91 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
return nil
}
+ // CRITICAL FIX: For proxyless gRPC, handle wildcard (empty
ResourceNames) requests correctly
+ // When client sends empty ResourceNames after receiving specific
resources, it's likely an ACK
+ // We should NOT generate all resources, but instead return the last
sent resources
+ // However, for initial wildcard requests, we need to extract resource
names from parent resources
+ var requestedResourceNames sets.String
+ var useLastSentResources bool
+ if con.proxy.IsProxylessGrpc() {
+ // Check if this is a wildcard request (empty ResourceNames)
but we have previously sent resources
+ if len(w.ResourceNames) == 0 && w.NonceSent != "" {
+ // This is likely an ACK after receiving specific
resources
+ // Use the last sent resources instead of generating all
+ useLastSentResources = true
+ // Get the last sent resource names from WatchedResource
+ // We'll populate this from the last sent resources
after generation
+ klog.V(2).Infof("pushXds: proxyless gRPC wildcard
request with NonceSent=%s, will use last sent resources", w.NonceSent)
+ } else if len(w.ResourceNames) == 0 && w.NonceSent == "" {
+ // CRITICAL FIX: Initial wildcard request - need to
extract resource names from parent resources
+ // For CDS: extract cluster names from LDS
+ // For EDS: extract cluster names from CDS
+ if w.TypeUrl == v3.ClusterType {
+ // Extract cluster names from LDS response
+ ldsWatched :=
con.proxy.GetWatchedResource(v3.ListenerType)
+ if ldsWatched != nil && ldsWatched.NonceSent !=
"" {
+ // LDS has been sent, extract cluster
names from it
+ // We need to regenerate LDS to extract
cluster names, or store them
+ // For now, let's extract from the
proxy's ServiceTargets or from LDS generation
+ klog.V(2).Infof("pushXds: CDS wildcard
request, extracting cluster names from LDS")
+ // Create a temporary request to
generate LDS and extract cluster names
+ ldsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.DependentResource),
+ Start: con.proxy.LastPushTime,
+ Forced: false,
+ }
+ ldsGen :=
s.findGenerator(v3.ListenerType, con)
+ if ldsGen != nil {
+ ldsRes, _, _ :=
ldsGen.Generate(con.proxy, ldsWatched, ldsReq)
+ if len(ldsRes) > 0 {
+ clusterNames :=
extractClusterNamesFromLDS(ldsRes)
+ if len(clusterNames) >
0 {
+ w.ResourceNames
= sets.New(clusterNames...)
+
requestedResourceNames = sets.New[string]()
+
requestedResourceNames.InsertAll(clusterNames...)
+
klog.V(2).Infof("pushXds: extracted %d cluster names from LDS: %v",
len(clusterNames), clusterNames)
+ }
+ }
+ }
+ }
+ } else if w.TypeUrl == v3.EndpointType {
+ // Extract cluster names from CDS response
+ cdsWatched :=
con.proxy.GetWatchedResource(v3.ClusterType)
+ if cdsWatched != nil && cdsWatched.NonceSent !=
"" {
+ // CDS has been sent, extract EDS
cluster names from it
+ klog.V(2).Infof("pushXds: EDS wildcard
request, extracting cluster names from CDS")
+ // Create a temporary request to
generate CDS and extract EDS cluster names
+ cdsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.DependentResource),
+ Start: con.proxy.LastPushTime,
+ Forced: false,
+ }
+ cdsGen :=
s.findGenerator(v3.ClusterType, con)
+ if cdsGen != nil {
+ cdsRes, _, _ :=
cdsGen.Generate(con.proxy, cdsWatched, cdsReq)
+ if len(cdsRes) > 0 {
+ edsClusterNames :=
extractEDSClusterNamesFromCDS(cdsRes)
+ if len(edsClusterNames)
> 0 {
+ w.ResourceNames
= sets.New(edsClusterNames...)
+
requestedResourceNames = sets.New[string]()
+
requestedResourceNames.InsertAll(edsClusterNames...)
+
klog.V(2).Infof("pushXds: extracted %d EDS cluster names from CDS: %v",
len(edsClusterNames), edsClusterNames)
+ }
+ }
+ }
+ }
+ }
+ } else if len(w.ResourceNames) > 0 {
+ // Specific resource request
+ requestedResourceNames = sets.New[string]()
+
requestedResourceNames.InsertAll(w.ResourceNames.UnsortedList()...)
+ }
+ }
+
// If delta is set, client is requesting new resources or removing old
ones. We should just generate the
// new resources it needs, rather than the entire set of known
resources.
// Note: we do not need to account for unsubscribed resources as these
are handled by parent removal;
@@ -78,7 +166,19 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
}
}
- res, logdata, err := gen.Generate(con.proxy, w, req)
+ // For proxyless gRPC wildcard requests with previous NonceSent, use
last sent resources
+ var res model.Resources
+ var logdata model.XdsLogDetails
+ var err error
+ if useLastSentResources {
+ // Don't generate new resources, return empty and we'll handle
it below
+ res = nil
+ logdata = model.DefaultXdsLogDetails
+ err = nil
+ } else {
+ res, logdata, err = gen.Generate(con.proxy, w, req)
+ }
+
info := ""
if len(logdata.AdditionalInfo) > 0 {
info = " " + logdata.AdditionalInfo
@@ -86,15 +186,69 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
if len(logFiltered) > 0 {
info += logFiltered
}
- if err != nil || res == nil {
+ if err != nil {
return err
}
+ // CRITICAL FIX: For proxyless gRPC wildcard requests with previous
NonceSent, return last sent resources
+ if useLastSentResources && res == nil {
+ // This is a wildcard ACK - client is acknowledging previous
push
+ // We should NOT push again, as the client already has the
resources
+ // The ShouldRespond logic should have prevented this, but we
handle it here as safety
+ klog.V(2).Infof("pushXds: proxyless gRPC wildcard ACK with
NonceSent=%s, skipping push (client already has resources from previous push)",
w.NonceSent)
+ return nil
+ }
+
+ if res == nil {
+ return nil
+ }
+
+ // CRITICAL FIX: For proxyless gRPC, filter resources to only include
requested ones
+ // This prevents the push loop where client requests 1 resource but
receives 13/14
+ var filteredRes model.Resources
+ if con.proxy.IsProxylessGrpc() {
+ if len(requestedResourceNames) > 0 {
+ // Filter to only requested resources
+ filteredRes = make(model.Resources, 0, len(res))
+ for _, r := range res {
+ if requestedResourceNames.Contains(r.Name) {
+ filteredRes = append(filteredRes, r)
+ } else {
+ klog.V(2).Infof("pushXds: filtering out
unrequested resource %s for proxyless gRPC (requested: %v)", r.Name,
requestedResourceNames.UnsortedList())
+ }
+ }
+ if len(filteredRes) != len(res) {
+ info += " filtered:" +
strconv.Itoa(len(res)-len(filteredRes))
+ res = filteredRes
+ }
+ // CRITICAL: If filtering resulted in 0 resources but
client requested specific resources,
+ // this means the requested resources don't exist.
Don't send empty response to avoid loop.
+ // Instead, log and return nil to prevent push.
+ if len(res) == 0 && len(requestedResourceNames) > 0 {
+ klog.Warningf("pushXds: proxyless gRPC
requested %d resources but none matched after filtering (requested: %v,
generated before filter: %d). Skipping push to avoid loop.",
+ len(requestedResourceNames),
requestedResourceNames.UnsortedList(), len(filteredRes)+len(res))
+ return nil
+ }
+ } else if len(w.ResourceNames) == 0 {
+ // Wildcard request without previous NonceSent - this
is initial request
+ // Allow generating all resources for initial connection
+ klog.V(2).Infof("pushXds: proxyless gRPC initial
wildcard request, generating all resources")
+ }
+ }
+
+ // CRITICAL: Never send empty response for proxyless gRPC - this causes
push loops
+ // If we have no resources to send, return nil instead of sending empty
response
+ if len(res) == 0 {
+ klog.V(2).Infof("pushXds: no resources to send for %s (proxy:
%s), skipping push", w.TypeUrl, con.proxy.ID)
+ return nil
+ }
+
+ nonceValue := nonce(req.Push.PushVersion)
resp := &discovery.DiscoveryResponse{
ControlPlane: ControlPlane(w.TypeUrl),
TypeUrl: w.TypeUrl,
VersionInfo: req.Push.PushVersion,
- Nonce: nonce(req.Push.PushVersion),
+ Nonce: nonceValue,
Resources: xds.ResourcesToAny(res),
}
@@ -107,17 +261,216 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
return err
}
+ // CRITICAL FIX: Update NonceSent after successfully sending the
response
+ // This is essential for tracking which nonce was sent and preventing
push loops
+ con.proxy.UpdateWatchedResource(w.TypeUrl, func(wr
*model.WatchedResource) *model.WatchedResource {
+ if wr == nil {
+ return nil
+ }
+ wr.NonceSent = nonceValue
+ // Also update ResourceNames to match what we actually sent
(for proxyless gRPC)
+ if con.proxy.IsProxylessGrpc() && res != nil {
+ sentNames := sets.New[string]()
+ for _, r := range res {
+ sentNames.Insert(r.Name)
+ }
+ // Only update if we sent different resources than
requested
+ if requestedResourceNames != nil {
+ // Compare sets by checking if they have the
same size and all elements match
+ if sentNames.Len() !=
requestedResourceNames.Len() {
+ wr.ResourceNames = sentNames
+ } else {
+ // Check if all sent names are in
requested names
+ allMatch := true
+ for name := range sentNames {
+ if
!requestedResourceNames.Contains(name) {
+ allMatch = false
+ break
+ }
+ }
+ if !allMatch {
+ wr.ResourceNames = sentNames
+ }
+ }
+ }
+ }
+ return wr
+ })
+
switch {
case !req.Full:
default:
// Log format matches Istio: "LDS: PUSH for node:xxx
resources:1 size:342B"
- klog.Infof("%s: %s for node:%s resources:%d size:%s",
v3.GetShortType(w.TypeUrl), ptype, con.proxy.ID, len(res),
- util.ByteCount(ResourceSize(res)))
+ resourceNamesStr := ""
+ if len(res) > 0 && len(res) <= 5 {
+ // Log resource names if there are few resources (for
debugging)
+ names := make([]string, 0, len(res))
+ for _, r := range res {
+ names = append(names, r.Name)
+ }
+ resourceNamesStr = fmt.Sprintf(" [%s]",
strings.Join(names, ", "))
+ }
+ klog.Infof("%s: %s for node:%s resources:%d size:%s%s%s",
v3.GetShortType(w.TypeUrl), ptype, con.proxy.ID, len(res),
+ util.ByteCount(ResourceSize(res)), info,
resourceNamesStr)
+ }
+
+ // CRITICAL FIX: For proxyless gRPC, after pushing LDS with outbound
listeners,
+ // automatically trigger CDS push for the referenced clusters ONLY if
this is a direct request push
+ // (not a push from pushConnection which would cause loops)
+ // Only auto-push if CDS is not already being watched by the client
(client will request it naturally)
+ if w.TypeUrl == v3.ListenerType && con.proxy.IsProxylessGrpc() &&
len(res) > 0 {
+ // Only auto-push CDS if this is a direct request (not a full
push from pushConnection)
+ // Check if this push was triggered by a direct client request
using IsRequest()
+ isDirectRequest := req.IsRequest()
+ if isDirectRequest {
+ clusterNames := extractClusterNamesFromLDS(res)
+ if len(clusterNames) > 0 {
+ cdsWatched :=
con.proxy.GetWatchedResource(v3.ClusterType)
+ // Only auto-push CDS if client hasn't already
requested it
+ // If client has already requested CDS
(WatchedResource exists with ResourceNames),
+ // the client's request will handle it, so we
don't need to auto-push
+ if cdsWatched == nil ||
cdsWatched.ResourceNames == nil || len(cdsWatched.ResourceNames) == 0 {
+ // Client hasn't requested CDS yet,
auto-push to ensure client gets the cluster config
+
con.proxy.NewWatchedResource(v3.ClusterType, clusterNames)
+ klog.V(2).Infof("pushXds: LDS push
completed, auto-pushing CDS for clusters: %v", clusterNames)
+ // Trigger CDS push directly without
going through pushConnection to avoid loops
+ cdsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.ProxyRequest),
+ Start: con.proxy.LastPushTime,
+ Forced: false,
+ }
+ if err := s.pushXds(con,
con.proxy.GetWatchedResource(v3.ClusterType), cdsReq); err != nil {
+ klog.Warningf("pushXds: failed
to push CDS after LDS: %v", err)
+ }
+ } else {
+ // Client has already requested CDS,
let the client's request handle it
+ klog.V(2).Infof("pushXds: LDS push
completed, client already requested CDS, skipping auto-push")
+ }
+ }
+ }
+ }
+
+ // CRITICAL FIX: For proxyless gRPC, after pushing CDS with EDS
clusters,
+ // automatically trigger EDS push for the referenced endpoints
+ // This is necessary for load balancing - client needs EDS to discover
all available endpoints
+ if w.TypeUrl == v3.ClusterType && con.proxy.IsProxylessGrpc() &&
len(res) > 0 {
+ // Extract EDS cluster names from CDS resources
+ edsClusterNames := extractEDSClusterNamesFromCDS(res)
+ if len(edsClusterNames) > 0 {
+ edsWatched :=
con.proxy.GetWatchedResource(v3.EndpointType)
+ shouldPushEDS := false
+ if edsWatched == nil {
+ // EDS not watched yet, create watched resource
+ con.proxy.NewWatchedResource(v3.EndpointType,
edsClusterNames)
+ shouldPushEDS = true
+ klog.V(2).Infof("pushXds: CDS push completed,
auto-pushing EDS for clusters: %v", edsClusterNames)
+ } else {
+ // Check if any cluster names are missing from
the watched set
+ existingNames := edsWatched.ResourceNames
+ if existingNames == nil {
+ existingNames = sets.New[string]()
+ }
+ hasNewClusters := false
+ for _, cn := range edsClusterNames {
+ if !existingNames.Contains(cn) {
+ hasNewClusters = true
+ break
+ }
+ }
+ if hasNewClusters {
+ // Update EDS watched resource to
include the new cluster names
+
con.proxy.UpdateWatchedResource(v3.EndpointType, func(wr
*model.WatchedResource) *model.WatchedResource {
+ if wr == nil {
+ wr =
&model.WatchedResource{TypeUrl: v3.EndpointType, ResourceNames:
sets.New[string]()}
+ }
+ existingNames :=
wr.ResourceNames
+ if existingNames == nil {
+ existingNames =
sets.New[string]()
+ }
+ for _, cn := range
edsClusterNames {
+ existingNames.Insert(cn)
+ }
+ wr.ResourceNames = existingNames
+ return wr
+ })
+ shouldPushEDS = true
+ klog.V(2).Infof("pushXds: CDS push
completed, updating EDS watched resource with new clusters: %v",
edsClusterNames)
+ } else {
+ klog.V(2).Infof("pushXds: CDS push
completed, EDS clusters already watched: %v", edsClusterNames)
+ }
+ }
+
+ // Only push EDS if we have new clusters to push
+ if shouldPushEDS {
+ // Trigger EDS push directly without going
through pushConnection to avoid loops
+ edsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.DependentResource),
+ Start: con.proxy.LastPushTime,
+ Forced: true, // Force EDS push to
ensure endpoints are available for load balancing
+ }
+ if err := s.pushXds(con,
con.proxy.GetWatchedResource(v3.EndpointType), edsReq); err != nil {
+ klog.Warningf("pushXds: failed to push
EDS after CDS: %v", err)
+ }
+ }
+ }
}
return nil
}
+// extractEDSClusterNamesFromCDS extracts EDS cluster names from CDS cluster
resources
+// Only clusters with ClusterDiscoveryType=EDS are returned
+func extractEDSClusterNamesFromCDS(clusters model.Resources) []string {
+ clusterNames := sets.New[string]()
+ for _, r := range clusters {
+ // Unmarshal the cluster resource to check its type
+ cl := &cluster.Cluster{}
+ if err := r.Resource.UnmarshalTo(cl); err != nil {
+ klog.V(2).Infof("extractEDSClusterNamesFromCDS: failed
to unmarshal cluster %s: %v", r.Name, err)
+ continue
+ }
+ // Check if this is an EDS cluster
+ if cl.ClusterDiscoveryType != nil {
+ if edsType, ok :=
cl.ClusterDiscoveryType.(*cluster.Cluster_Type); ok && edsType.Type ==
cluster.Cluster_EDS {
+ // This is an EDS cluster, add to the list
+ clusterNames.Insert(cl.Name)
+ }
+ }
+ }
+ return clusterNames.UnsortedList()
+}
+
+// extractClusterNamesFromLDS extracts cluster names referenced in LDS
listener resources
+// For outbound listeners with name format "hostname:port", the cluster name
is "outbound|port||hostname"
+func extractClusterNamesFromLDS(listeners model.Resources) []string {
+ clusterNames := sets.New[string]()
+ for _, r := range listeners {
+ // Parse listener name to extract cluster name
+ // Outbound listener format: "hostname:port" -> cluster:
"outbound|port||hostname"
+ // Inbound listener format:
"xds.dubbo.io/grpc/lds/inbound/[::]:port" -> no cluster
+ listenerName := r.Name
+ if strings.Contains(listenerName, ":") &&
!strings.HasPrefix(listenerName, "xds.dubbo.io/grpc/lds/inbound/") {
+ // This is an outbound listener
+ parts := strings.Split(listenerName, ":")
+ if len(parts) == 2 {
+ hostname := parts[0]
+ portStr := parts[1]
+ port, err := strconv.Atoi(portStr)
+ if err == nil {
+ // Build cluster name:
outbound|port||hostname
+ clusterName :=
model.BuildSubsetKey(model.TrafficDirectionOutbound, "", host.Name(hostname),
port)
+ clusterNames.Insert(clusterName)
+ }
+ }
+ }
+ }
+ return clusterNames.UnsortedList()
+}
+
func ResourceSize(r model.Resources) int {
size := 0
for _, r := range r {
diff --git a/samples/example/README.md b/samples/example/README.md
deleted file mode 100644
index 924c415c..00000000
--- a/samples/example/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Example
\ No newline at end of file
diff --git a/samples/grpc-proxyless/grpc-proxyless.yaml
b/samples/grpc-proxyless/grpc-proxyless.yaml
new file mode 100644
index 00000000..f13deb9c
--- /dev/null
+++ b/samples/grpc-proxyless/grpc-proxyless.yaml
@@ -0,0 +1,107 @@
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ app: consumer
+ name: consumer
+ namespace: grpc-proxyless
+spec:
+ selector:
+ app: consumer
+ type: ClusterIP
+ ports:
+ - name: grpc
+ port: 7070
+ targetPort: 17070
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: consumer
+ namespace: grpc-proxyless
+spec:
+ replicas: 2
+ selector:
+ matchLabels:
+ app: consumer
+ template:
+ metadata:
+ annotations:
+ proxyless.dubbo.io/inject: "true"
+ inject.dubbo.io/templates: grpc-agent
+ proxy.dubbo.io/config: '{"holdApplicationUntilProxyStarts": true}'
+ labels:
+ app: consumer
+ spec:
+ containers:
+ - name: app
+ image: mfordjody/grpc-consumer:latest
+ imagePullPolicy: Always
+ args:
+ - --port=17070
+ ports:
+ - containerPort: 17070
+ protocol: TCP
+ name: grpc
+ env:
+ - name: INSTANCE_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ readinessProbe:
+ tcpSocket:
+ port: 17070
+ initialDelaySeconds: 5
+ periodSeconds: 5
+ timeoutSeconds: 2
+ successThreshold: 1
+ failureThreshold: 3
+ livenessProbe:
+ tcpSocket:
+ port: 17070
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ timeoutSeconds: 2
+ successThreshold: 1
+ failureThreshold: 3
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: producer
+ namespace: grpc-proxyless
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: producer
+ template:
+ metadata:
+ annotations:
+ proxyless.dubbo.io/inject: "true"
+ inject.dubbo.io/templates: grpc-agent
+ proxy.dubbo.io/config: '{"holdApplicationUntilProxyStarts": true}'
+ labels:
+ app: producer
+ spec:
+ containers:
+ - name: app
+ # Replace with your own image containing the producer binary
+ image: mfordjody/grpc-producer:latest
+ imagePullPolicy: Always
+ args:
+ - --port=17171
+ # Optional: uncomment to test direct connection
+ # - --target=xds:///consumer.grpc-proxyless.svc.cluster.local:7070
+ # - --count=10
+ ports:
+ - containerPort: 17171
+ protocol: TCP
+ name: grpc-test
+ env:
+ - name: INSTANCE_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
diff --git a/test/grpc-proxyless/.dockerignore
b/test/grpc-proxyless/.dockerignore
new file mode 100644
index 00000000..ffa25274
--- /dev/null
+++ b/test/grpc-proxyless/.dockerignore
@@ -0,0 +1,6 @@
+*.md
+bin/
+*.pb.go
+.git/
+.gitignore
+README.md
\ No newline at end of file
diff --git a/test/grpc-proxyless/.gitignore b/test/grpc-proxyless/.gitignore
new file mode 100644
index 00000000..db441f49
--- /dev/null
+++ b/test/grpc-proxyless/.gitignore
@@ -0,0 +1,2 @@
+bin/
+*.pb.go
diff --git a/test/grpc-proxyless/Dockerfile.consumer
b/test/grpc-proxyless/Dockerfile.consumer
new file mode 100644
index 00000000..0ed7fd18
--- /dev/null
+++ b/test/grpc-proxyless/Dockerfile.consumer
@@ -0,0 +1,35 @@
+FROM golang:1.24-alpine AS builder
+
+WORKDIR /build
+
+RUN apk add --no-cache protobuf
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \
+ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
+
+ENV PATH=$PATH:/go/bin:/root/go/bin
+
+COPY proto/echo.proto ./proto/
+RUN protoc --go_out=. --go_opt=paths=source_relative \
+ --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+ proto/echo.proto
+
+COPY consumer/ ./consumer/
+
+ARG GOOS=linux
+ARG GOARCH=amd64
+RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -a -ldflags
'-extldflags "-static"' -o /build/grpc-consumer ./consumer/
+
+FROM alpine:latest
+RUN apk update && \
+ apk --no-cache add ca-certificates tzdata || \
+ (sleep 2 && apk update && apk --no-cache add ca-certificates tzdata)
+WORKDIR /app
+
+COPY --from=builder /build/grpc-consumer /usr/local/bin/grpc-consumer
+RUN chmod +x /usr/local/bin/grpc-consumer
+
+ENTRYPOINT ["/usr/local/bin/grpc-consumer"]
diff --git a/test/grpc-proxyless/Dockerfile.producer
b/test/grpc-proxyless/Dockerfile.producer
new file mode 100644
index 00000000..38add387
--- /dev/null
+++ b/test/grpc-proxyless/Dockerfile.producer
@@ -0,0 +1,35 @@
+FROM golang:1.24-alpine AS builder
+
+WORKDIR /build
+
+RUN apk add --no-cache protobuf
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \
+ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
+
+ENV PATH=$PATH:/go/bin:/root/go/bin
+
+COPY proto/echo.proto ./proto/
+RUN protoc --go_out=. --go_opt=paths=source_relative \
+ --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+ proto/echo.proto
+
+COPY producer/ ./producer/
+
+ARG GOOS=linux
+ARG GOARCH=amd64
+RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -a -ldflags
'-extldflags "-static"' -o /build/grpc-producer ./producer/
+
+FROM alpine:latest
+RUN apk update && \
+ apk --no-cache add ca-certificates tzdata || \
+ (sleep 2 && apk update && apk --no-cache add ca-certificates tzdata)
+WORKDIR /app
+
+COPY --from=builder /build/grpc-producer /usr/local/bin/grpc-producer
+RUN chmod +x /usr/local/bin/grpc-producer
+
+ENTRYPOINT ["/usr/local/bin/grpc-producer"]
diff --git a/test/grpc-proxyless/README.md b/test/grpc-proxyless/README.md
new file mode 100644
index 00000000..fa268450
--- /dev/null
+++ b/test/grpc-proxyless/README.md
@@ -0,0 +1,12 @@
+# gRPC Proxyless Test Example
+
+This is a test example for gRPC proxyless service mesh based on [Istio's blog
post](https://istio.io/latest/blog/2021/proxyless-grpc/).
+
+## Architecture
+
+- **Consumer**: gRPC server with xDS support (port 17070)
+- **Producer**: gRPC client with xDS support + test server (port 17171)
+
+Both services use `dubbo-proxy` sidecar as an xDS proxy to connect to the
control plane. The sidecar runs an xDS proxy server that listens on a Unix
Domain Socket (UDS) at `/etc/dubbo/proxy/XDS`. The gRPC applications connect to
this xDS proxy via the UDS socket using the `GRPC_XDS_BOOTSTRAP` environment
variable.
+
+**Note**: This is "proxyless" in the sense that the applications use native
gRPC xDS clients instead of Envoy proxy for traffic routing. However, a
lightweight sidecar (`dubbo-proxy`) is still used to proxy xDS API calls
between the gRPC clients and the control plane.
diff --git a/test/grpc-proxyless/consumer/main.go
b/test/grpc-proxyless/consumer/main.go
new file mode 100644
index 00000000..61ce9f4d
--- /dev/null
+++ b/test/grpc-proxyless/consumer/main.go
@@ -0,0 +1,135 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ xdscreds "google.golang.org/grpc/credentials/xds"
+ "google.golang.org/grpc/reflection"
+ "google.golang.org/grpc/xds"
+
+ pb "github.com/apache/dubbo-kubernetes/test/grpc-proxyless/proto"
+)
+
+var (
+ port = flag.Int("port", 17070, "gRPC server port")
+)
+
+type echoServer struct {
+ pb.UnimplementedEchoServiceServer
+ pb.UnimplementedEchoTestServiceServer
+ hostname string
+}
+
+func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest)
(*pb.EchoResponse, error) {
+ if req == nil {
+ return nil, fmt.Errorf("request is nil")
+ }
+ log.Printf("Received: %v", req.Message)
+ return &pb.EchoResponse{
+ Message: req.Message,
+ Hostname: s.hostname,
+ }, nil
+}
+
+func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream
pb.EchoService_StreamEchoServer) error {
+ if req == nil {
+ return fmt.Errorf("request is nil")
+ }
+ if stream == nil {
+ return fmt.Errorf("stream is nil")
+ }
+ log.Printf("StreamEcho received: %v", req.Message)
+ for i := 0; i < 3; i++ {
+ if err := stream.Send(&pb.EchoResponse{
+ Message: fmt.Sprintf("%s [%d]", req.Message, i),
+ Hostname: s.hostname,
+ }); err != nil {
+ log.Printf("StreamEcho send error: %v", err)
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *echoServer) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+ if req == nil {
+ return nil, fmt.Errorf("request is nil")
+ }
+
+ count := req.Count
+ if count < 0 {
+ count = 0
+ }
+ if count > 100 {
+ count = 100
+ }
+
+ log.Printf("ForwardEcho called: url=%s, count=%d", req.Url, count)
+
+ output := make([]string, 0, count)
+ for i := int32(0); i < count; i++ {
+ line := fmt.Sprintf("[%d body] Hostname=%s", i, s.hostname)
+ output = append(output, line)
+ }
+
+ return &pb.ForwardEchoResponse{
+ Output: output,
+ }, nil
+}
+
+func main() {
+ flag.Parse()
+
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "unknown"
+ }
+
+ // Create xDS-enabled gRPC server
+ // For proxyless gRPC, we use xds.NewGRPCServer() instead of
grpc.NewServer()
+ creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
+ FallbackCreds: insecure.NewCredentials(),
+ })
+ if err != nil {
+ log.Fatalf("Failed to create xDS server credentials: %v", err)
+ }
+
+ server, err := xds.NewGRPCServer(grpc.Creds(creds))
+ if err != nil {
+ log.Fatalf("Failed to create xDS gRPC server: %v", err)
+ }
+
+ es := &echoServer{hostname: hostname}
+ pb.RegisterEchoServiceServer(server, es)
+ pb.RegisterEchoTestServiceServer(server, es)
+ // Enable reflection API for grpcurl to discover services
+ reflection.Register(server)
+
+ lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
+ if err != nil {
+ log.Fatalf("Failed to listen: %v", err)
+ }
+
+ log.Printf("Starting gRPC proxyless server on port %d (hostname: %s)",
*port, hostname)
+
+ go func() {
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+ <-sigChan
+ log.Println("Shutting down server...")
+ server.GracefulStop()
+ }()
+
+ if err := server.Serve(lis); err != nil {
+ log.Fatalf("Failed to serve: %v", err)
+ }
+}
diff --git a/test/grpc-proxyless/generate-proto.sh
b/test/grpc-proxyless/generate-proto.sh
new file mode 100755
index 00000000..99ead4b3
--- /dev/null
+++ b/test/grpc-proxyless/generate-proto.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+set -e
+cd "$(dirname "$0")"
+
+# This script will be used when protoc is fixed
+# For now, proto files are generated in Dockerfile
+echo "Proto files are generated in Dockerfile during build"
+echo "To generate locally, fix protoc first:"
+echo " brew reinstall protobuf abseil"
+echo ""
+echo "Then run:"
+echo " protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=.
--go-grpc_opt=paths=source_relative proto/echo.proto"
diff --git a/test/grpc-proxyless/go.mod b/test/grpc-proxyless/go.mod
new file mode 100644
index 00000000..bef17d48
--- /dev/null
+++ b/test/grpc-proxyless/go.mod
@@ -0,0 +1,29 @@
+module github.com/apache/dubbo-kubernetes/test/grpc-proxyless
+
+go 1.24.0
+
+require (
+ google.golang.org/grpc v1.76.0
+ google.golang.org/protobuf v1.36.10
+)
+
+require (
+ cel.dev/expr v0.24.0 // indirect
+ cloud.google.com/go/compute/metadata v0.7.0 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
+ github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
+ github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
+ github.com/go-jose/go-jose/v4 v4.1.2 // indirect
+ github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
// indirect
+ github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
+ github.com/zeebo/errs v1.4.0 // indirect
+ golang.org/x/crypto v0.43.0 // indirect
+ golang.org/x/net v0.46.0 // indirect
+ golang.org/x/oauth2 v0.30.0 // indirect
+ golang.org/x/sync v0.17.0 // indirect
+ golang.org/x/sys v0.37.0 // indirect
+ golang.org/x/text v0.30.0 // indirect
+ google.golang.org/genproto/googleapis/api
v0.0.0-20250804133106-a7a43d27e69b // indirect
+ google.golang.org/genproto/googleapis/rpc
v0.0.0-20251103181224-f26f9409b101 // indirect
+)
diff --git a/test/grpc-proxyless/go.sum b/test/grpc-proxyless/go.sum
new file mode 100644
index 00000000..a0ddf76c
--- /dev/null
+++ b/test/grpc-proxyless/go.sum
@@ -0,0 +1,76 @@
+cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
+cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
+cloud.google.com/go/compute/metadata v0.7.0
h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU=
+cloud.google.com/go/compute/metadata v0.7.0/go.mod
h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo=
+github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443
h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod
h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
+github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/envoyproxy/go-control-plane v0.13.4
h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
+github.com/envoyproxy/go-control-plane v0.13.4/go.mod
h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4
h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod
h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0
h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod
h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
+github.com/envoyproxy/protoc-gen-validate v1.2.1
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
+github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
+github.com/go-jose/go-jose/v4 v4.1.2
h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI=
+github.com/go-jose/go-jose/v4 v4.1.2/go.mod
h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo=
+github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
+github.com/go-logr/logr v1.4.3/go.mod
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod
h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/golang/protobuf v1.5.4
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
+github.com/golang/protobuf v1.5.4/go.mod
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod
h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
+github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/spiffe/go-spiffe/v2 v2.5.0
h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
+github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod
h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
+github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
+github.com/zeebo/errs v1.4.0/go.mod
h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
+go.opentelemetry.io/auto/sdk v1.1.0
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
+go.opentelemetry.io/auto/sdk v1.1.0/go.mod
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
+go.opentelemetry.io/otel v1.37.0
h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
+go.opentelemetry.io/otel v1.37.0/go.mod
h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
+go.opentelemetry.io/otel/metric v1.37.0
h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
+go.opentelemetry.io/otel/metric v1.37.0/go.mod
h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
+go.opentelemetry.io/otel/sdk v1.37.0
h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
+go.opentelemetry.io/otel/sdk v1.37.0/go.mod
h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
+go.opentelemetry.io/otel/sdk/metric v1.37.0
h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
+go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod
h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
+go.opentelemetry.io/otel/trace v1.37.0
h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
+go.opentelemetry.io/otel/trace v1.37.0/go.mod
h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
+golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
+golang.org/x/crypto v0.43.0/go.mod
h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
+golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
+golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
+golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
+golang.org/x/oauth2 v0.30.0/go.mod
h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
+golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
+golang.org/x/sync v0.17.0/go.mod
h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
+golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
+golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
+golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
+golang.org/x/text v0.30.0/go.mod
h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
+gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
+gonum.org/v1/gonum v0.16.0/go.mod
h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
+google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b
h1:ULiyYQ0FdsJhwwZUwbaXpZF5yUE3h+RA+gxvBu37ucc=
+google.golang.org/genproto/googleapis/api
v0.0.0-20250804133106-a7a43d27e69b/go.mod
h1:oDOGiMSXHL4sDTJvFvIB9nRQCGdLP1o/iVaqQK8zB+M=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101
h1:tRPGkdGHuewF4UisLzzHHr1spKw92qLM98nIzxbC0wY=
+google.golang.org/genproto/googleapis/rpc
v0.0.0-20251103181224-f26f9409b101/go.mod
h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
+google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
+google.golang.org/grpc v1.76.0/go.mod
h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
+google.golang.org/protobuf v1.36.10
h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
+google.golang.org/protobuf v1.36.10/go.mod
h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/grpc-proxyless/producer/main.go
b/test/grpc-proxyless/producer/main.go
new file mode 100644
index 00000000..b2284a20
--- /dev/null
+++ b/test/grpc-proxyless/producer/main.go
@@ -0,0 +1,401 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/credentials/insecure"
+ xdscreds "google.golang.org/grpc/credentials/xds"
+ "google.golang.org/grpc/reflection"
+ _ "google.golang.org/grpc/xds"
+
+ pb "github.com/apache/dubbo-kubernetes/test/grpc-proxyless/proto"
+)
+
+var (
+ target = flag.String("target", "", "Target service address with
xds:/// scheme (optional)")
+ count = flag.Int("count", 5, "Number of requests to send")
+ port = flag.Int("port", 17171, "gRPC server port for ForwardEcho
testing")
+ testServer *grpc.Server
+)
+
+func main() {
+ flag.Parse()
+
+ go startTestServer(*port)
+
+ if *target != "" {
+ testDirectConnection(*target, *count)
+ }
+
+ log.Printf("Producer running. Test server listening on port %d for
ForwardEcho", *port)
+
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+ <-sigChan
+ log.Println("Shutting down...")
+
+ if testServer != nil {
+ log.Println("Stopping test server...")
+ testServer.GracefulStop()
+ }
+}
+
+func testDirectConnection(target string, count int) {
+ creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+ FallbackCreds: insecure.NewCredentials(),
+ })
+ if err != nil {
+ log.Fatalf("Failed to create xDS client credentials: %v", err)
+ }
+
+ ctx := context.Background()
+ conn, err := grpc.DialContext(ctx, target,
grpc.WithTransportCredentials(creds))
+ if err != nil {
+ log.Fatalf("Failed to connect: %v", err)
+ }
+ defer conn.Close()
+
+ client := pb.NewEchoServiceClient(conn)
+
+ log.Printf("Connected to %s, sending %d requests...", target, count)
+
+ for i := 0; i < count; i++ {
+ req := &pb.EchoRequest{
+ Message: fmt.Sprintf("Hello from producer [%d]", i+1),
+ }
+
+ reqCtx, cancel := context.WithTimeout(context.Background(),
10*time.Second)
+ resp, err := client.Echo(reqCtx, req)
+ cancel()
+
+ if err != nil {
+ log.Printf("Request %d failed: %v", i+1, err)
+ continue
+ }
+
+ if resp == nil {
+ log.Printf("Request %d failed: response is nil", i+1)
+ continue
+ }
+
+ log.Printf("Request %d: Response=%s, Hostname=%s", i+1,
resp.Message, resp.Hostname)
+ time.Sleep(500 * time.Millisecond)
+ }
+
+ log.Println("All requests completed")
+}
+
+func startTestServer(port int) {
+ lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
+ if err != nil {
+ log.Printf("Failed to listen on port %d: %v", port, err)
+ return
+ }
+
+ testServer = grpc.NewServer()
+ pb.RegisterEchoTestServiceServer(testServer, &testServerImpl{
+ connCache: make(map[string]*grpc.ClientConn),
+ })
+ reflection.Register(testServer)
+
+ log.Printf("Test server listening on port %d for ForwardEcho
(reflection enabled)", port)
+ if err := testServer.Serve(lis); err != nil {
+ log.Printf("Test server error: %v", err)
+ }
+}
+
+type testServerImpl struct {
+ pb.UnimplementedEchoTestServiceServer
+ // Connection cache: map from URL to gRPC connection
+ connCache map[string]*grpc.ClientConn
+ connMutex sync.RWMutex
+}
+
+func (s *testServerImpl) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+ if req == nil {
+ return nil, fmt.Errorf("request is nil")
+ }
+
+ if req.Url == "" {
+ return nil, fmt.Errorf("url is required")
+ }
+
+ count := req.Count
+ if count < 0 {
+ count = 0
+ }
+ if count > 100 {
+ count = 100
+ }
+
+ log.Printf("ForwardEcho: url=%s, count=%d", req.Url, count)
+
+ // Check bootstrap configuration
+ bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
+ if bootstrapPath == "" {
+ return nil, fmt.Errorf("GRPC_XDS_BOOTSTRAP environment variable
is not set")
+ }
+
+ // Verify bootstrap file exists
+ if _, err := os.Stat(bootstrapPath); os.IsNotExist(err) {
+ return nil, fmt.Errorf("bootstrap file does not exist: %s",
bootstrapPath)
+ }
+
+ // Read bootstrap file to verify UDS socket
+ bootstrapData, err := os.ReadFile(bootstrapPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read bootstrap file: %v", err)
+ }
+
+ var bootstrapJSON map[string]interface{}
+ if err := json.Unmarshal(bootstrapData, &bootstrapJSON); err != nil {
+ return nil, fmt.Errorf("failed to parse bootstrap file: %v",
err)
+ }
+
+ // Extract UDS socket path
+ var udsPath string
+ if xdsServers, ok := bootstrapJSON["xds_servers"].([]interface{}); ok
&& len(xdsServers) > 0 {
+ if server, ok := xdsServers[0].(map[string]interface{}); ok {
+ if serverURI, ok := server["server_uri"].(string); ok {
+ if strings.HasPrefix(serverURI, "unix://") {
+ udsPath = strings.TrimPrefix(serverURI,
"unix://")
+ if _, err := os.Stat(udsPath);
os.IsNotExist(err) {
+ return nil, fmt.Errorf("UDS
socket does not exist: %s", udsPath)
+ }
+ }
+ }
+ }
+ }
+
+ // CRITICAL: Reuse connections to avoid creating new xDS connections
for each RPC call
+ // This prevents the RDS request loop issue and ensures stable
connection state
+ s.connMutex.RLock()
+ conn, exists := s.connCache[req.Url]
+ s.connMutex.RUnlock()
+
+ // Check if cached connection is still valid (not closed/shutdown)
+ if exists && conn != nil {
+ state := conn.GetState()
+ if state == connectivity.Shutdown {
+ // Connection is closed, remove from cache and create
new one
+ log.Printf("ForwardEcho: cached connection for %s is
SHUTDOWN, removing from cache", req.Url)
+ s.connMutex.Lock()
+ delete(s.connCache, req.Url)
+ conn = nil
+ exists = false
+ s.connMutex.Unlock()
+ }
+ }
+
+ if !exists || conn == nil {
+ // Create new connection
+ s.connMutex.Lock()
+ // Double-check after acquiring write lock
+ if conn, exists = s.connCache[req.Url]; !exists || conn == nil {
+ // Create xDS client credentials
+ creds, err :=
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+ FallbackCreds: insecure.NewCredentials(),
+ })
+ if err != nil {
+ s.connMutex.Unlock()
+ return nil, fmt.Errorf("failed to create xDS
client credentials: %v", err)
+ }
+
+ // Dial with xDS URL - use background context, not the
request context
+ // The request context might timeout before xDS
configuration is received
+ log.Printf("ForwardEcho: creating new connection for
%s...", req.Url)
+ conn, err = grpc.DialContext(context.Background(),
req.Url, grpc.WithTransportCredentials(creds))
+ if err != nil {
+ s.connMutex.Unlock()
+ return nil, fmt.Errorf("failed to dial %s: %v",
req.Url, err)
+ }
+ s.connCache[req.Url] = conn
+ log.Printf("ForwardEcho: cached connection for %s",
req.Url)
+ }
+ s.connMutex.Unlock()
+ } else {
+ log.Printf("ForwardEcho: reusing cached connection for %s
(state: %v)", req.Url, conn.GetState())
+ }
+
+ initialState := conn.GetState()
+ log.Printf("ForwardEcho: initial connection state: %v", initialState)
+
+ // CRITICAL: If connection is already READY, use it directly without
waiting
+ // For cached connections, they should already be in READY state
+ if initialState == connectivity.Ready {
+ log.Printf("ForwardEcho: connection is already READY,
proceeding with RPC calls")
+ } else {
+ // Only wait for new connections or connections that are not
READY
+ // For gRPC xDS proxyless, we need to wait for the client to
receive and process LDS/CDS/EDS
+ // The connection state may transition: IDLE -> CONNECTING ->
READY (or TRANSIENT_FAILURE -> CONNECTING -> READY)
+ log.Printf("ForwardEcho: waiting for xDS configuration to be
processed and connection to be ready (30 seconds)...")
+
+ // Wait for state changes with multiple attempts
+ maxWait := 30 * time.Second
+
+ // Wait for state changes, allowing multiple state transitions
+ // CRITICAL: Don't exit on TRANSIENT_FAILURE - it may recover
to READY
+ stateChanged := false
+ currentState := initialState
+ startTime := time.Now()
+ lastStateChangeTime := startTime
+
+ for time.Since(startTime) < maxWait {
+ if currentState == connectivity.Ready {
+ log.Printf("ForwardEcho: connection is READY
after %v", time.Since(startTime))
+ stateChanged = true
+ break
+ }
+
+ // Only exit on Shutdown, not on TransientFailure (it
may recover)
+ if currentState == connectivity.Shutdown {
+ log.Printf("ForwardEcho: connection in %v state
after %v, cannot recover", currentState, time.Since(startTime))
+ break
+ }
+
+ // Wait for state change with remaining timeout
+ remaining := maxWait - time.Since(startTime)
+ if remaining <= 0 {
+ break
+ }
+
+ // Use shorter timeout for each WaitForStateChange call
to allow periodic checks
+ waitTimeout := remaining
+ if waitTimeout > 5*time.Second {
+ waitTimeout = 5 * time.Second
+ }
+
+ stateCtx, stateCancel :=
context.WithTimeout(context.Background(), waitTimeout)
+ if conn.WaitForStateChange(stateCtx, currentState) {
+ newState := conn.GetState()
+ elapsed := time.Since(startTime)
+ log.Printf("ForwardEcho: connection state
changed from %v to %v after %v", currentState, newState, elapsed)
+ stateChanged = true
+ currentState = newState
+ lastStateChangeTime = time.Now()
+
+ // If READY, we're done
+ if newState == connectivity.Ready {
+ stateCancel()
+ break
+ }
+
+ // If we're in TRANSIENT_FAILURE, continue
waiting - it may recover
+ // gRPC xDS client will retry connection when
endpoints become available
+ if newState == connectivity.TransientFailure {
+ log.Printf("ForwardEcho: connection in
TRANSIENT_FAILURE, continuing to wait for recovery (remaining: %v)",
maxWait-elapsed)
+ }
+ } else {
+ // Timeout waiting for state change - check if
we should continue
+ elapsed := time.Since(startTime)
+ if currentState ==
connectivity.TransientFailure {
+ // If we've been in TRANSIENT_FAILURE
for a while, continue waiting
+ // The connection may recover when
endpoints become available
+ if time.Since(lastStateChangeTime) <
10*time.Second {
+ log.Printf("ForwardEcho: still
in TRANSIENT_FAILURE after %v, continuing to wait (remaining: %v)", elapsed,
maxWait-elapsed)
+ } else {
+ log.Printf("ForwardEcho: no
state change after %v, current state: %v (remaining: %v)", elapsed,
currentState, maxWait-elapsed)
+ }
+ } else {
+ log.Printf("ForwardEcho: no state
change after %v, current state: %v (remaining: %v)", elapsed, currentState,
maxWait-elapsed)
+ }
+ }
+ stateCancel()
+ }
+
+ finalState := conn.GetState()
+ log.Printf("ForwardEcho: final connection state: %v
(stateChanged=%v, waited=%v)", finalState, stateChanged, time.Since(startTime))
+
+ // If connection is not READY, log a warning but proceed anyway
+ // The first RPC call may trigger connection establishment
+ if finalState != connectivity.Ready {
+ log.Printf("ForwardEcho: WARNING - connection is not
READY (state=%v), but proceeding with RPC calls", finalState)
+ }
+ }
+
+ // Create client and make RPC calls
+ client := pb.NewEchoServiceClient(conn)
+ output := make([]string, 0, count)
+
+ log.Printf("ForwardEcho: sending %d requests...", count)
+ for i := int32(0); i < count; i++ {
+ echoReq := &pb.EchoRequest{
+ Message: fmt.Sprintf("Request %d", i+1),
+ }
+
+ currentState := conn.GetState()
+ log.Printf("ForwardEcho: sending request %d (connection state:
%v)...", i+1, currentState)
+
+ // Use longer timeout for first request to allow connection
establishment
+ // For subsequent requests, use shorter timeout but still allow
for retries
+ timeout := 30 * time.Second
+ if i > 0 {
+ timeout = 20 * time.Second
+ }
+
+ reqCtx, reqCancel := context.WithTimeout(context.Background(),
timeout)
+ reqStartTime := time.Now()
+ resp, err := client.Echo(reqCtx, echoReq)
+ duration := time.Since(reqStartTime)
+ reqCancel()
+
+ // Check connection state after RPC call
+ stateAfterRPC := conn.GetState()
+ log.Printf("ForwardEcho: request %d completed in %v, connection
state: %v (was %v)", i+1, duration, stateAfterRPC, currentState)
+
+ if err != nil {
+ log.Printf("ForwardEcho: request %d failed: %v", i+1,
err)
+ output = append(output, fmt.Sprintf("[%d] Error: %v",
i, err))
+
+ // If connection is in TRANSIENT_FAILURE, wait a bit
before next request
+ // to allow gRPC client to retry and recover
+ if stateAfterRPC == connectivity.TransientFailure && i
< count-1 {
+ waitTime := 2 * time.Second
+ log.Printf("ForwardEcho: connection in
TRANSIENT_FAILURE, waiting %v before next request...", waitTime)
+ time.Sleep(waitTime)
+
+ // Check if connection recovered
+ newState := conn.GetState()
+ if newState == connectivity.Ready {
+ log.Printf("ForwardEcho: connection
recovered to READY after wait")
+ } else {
+ log.Printf("ForwardEcho: connection
state after wait: %v", newState)
+ }
+ }
+ continue
+ }
+
+ if resp == nil {
+ log.Printf("ForwardEcho: request %d failed: response is
nil", i+1)
+ output = append(output, fmt.Sprintf("[%d] Error:
response is nil", i))
+ continue
+ }
+
+ log.Printf("ForwardEcho: request %d succeeded: Hostname=%s",
i+1, resp.Hostname)
+ output = append(output, fmt.Sprintf("[%d body] Hostname=%s", i,
resp.Hostname))
+
+ // Small delay between successful requests to avoid
overwhelming the server
+ if i < count-1 {
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+
+ log.Printf("ForwardEcho: completed %d requests", count)
+
+ return &pb.ForwardEchoResponse{
+ Output: output,
+ }, nil
+}
diff --git a/test/grpc-proxyless/proto/echo.proto
b/test/grpc-proxyless/proto/echo.proto
new file mode 100644
index 00000000..cb36e618
--- /dev/null
+++ b/test/grpc-proxyless/proto/echo.proto
@@ -0,0 +1,42 @@
+syntax = "proto3";
+
+package echo;
+
+option go_package =
"github.com/apache/dubbo-kubernetes/test/grpc-proxyless/proto";
+
+// Echo service for testing gRPC proxyless
+service EchoService {
+ // Echo returns the same message sent
+ rpc Echo(EchoRequest) returns (EchoResponse);
+
+ // StreamEcho streams back the same message
+ rpc StreamEcho(EchoRequest) returns (stream EchoResponse);
+}
+
+// EchoTestService for grpcurl testing (similar to Istio's echo service)
+service EchoTestService {
+ // ForwardEcho forwards a request to another service and returns the response
+ rpc ForwardEcho(ForwardEchoRequest) returns (ForwardEchoResponse);
+}
+
+message EchoRequest {
+ string message = 1;
+}
+
+message EchoResponse {
+ string message = 1;
+ string hostname = 2;
+}
+
+message ForwardEchoRequest {
+ string url = 1; // Target URL (e.g.,
"xds:///consumer.grpc-proxyless.svc.cluster.local:7070")
+ int32 count = 2; // Number of requests to send
+ map<string, string> headers = 3;
+ int32 timeout = 4; // Timeout in seconds
+ bool h2 = 5; // Use HTTP/2
+ bool insecure = 6; // Use insecure connection
+}
+
+message ForwardEchoResponse {
+ repeated string output = 1; // Output lines from the requests
+}
diff --git a/test/grpc-proxyless/proto/gen.sh b/test/grpc-proxyless/proto/gen.sh
new file mode 100755
index 00000000..fc0bea40
--- /dev/null
+++ b/test/grpc-proxyless/proto/gen.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+set -e
+cd "$(dirname "$0")/.."
+export PATH=$PATH:$(go env GOPATH)/bin
+protoc --go_out=. --go_opt=paths=source_relative \
+ --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+ proto/echo.proto
diff --git a/test/grpc-proxyless/test-commands.sh
b/test/grpc-proxyless/test-commands.sh
new file mode 100755
index 00000000..d78b8d46
--- /dev/null
+++ b/test/grpc-proxyless/test-commands.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+set -e
+
+NAMESPACE="grpc-proxyless"
+
+echo "=== gRPC Proxyless 测试命令 ==="
+echo ""
+
+echo "=== 快速测试步骤 ==="
+echo ""
+echo "# 1. 重新构建 producer 镜像(代码已修改)"
+echo "cd test/grpc-proxyless"
+echo "docker build -f Dockerfile.producer -t mfordjody/grpc-producer:latest ."
+echo "docker push mfordjody/grpc-producer:latest # 如果需要推送到 registry"
+echo ""
+echo "# 2. 重启 producer deployment"
+echo "kubectl rollout restart deployment/producer -n grpc-proxyless"
+echo "kubectl rollout status deployment/producer -n grpc-proxyless"
+echo ""
+echo "# 3. 等待 pod 就绪"
+echo "kubectl wait --for=condition=ready pod -l app=producer -n grpc-proxyless
--timeout=60s"
+echo ""
+echo "# 4. 查看 producer 应用日志(观察连接状态和 RPC 调用)"
+echo "kubectl logs -f -l app=producer -c app -n grpc-proxyless --tail=100"
+echo ""
+echo "# 5. 查看 producer xDS proxy 日志(观察 EDS 响应)"
+echo "kubectl logs -f -l app=producer -c dubbo-proxy -n grpc-proxyless
--tail=100 | grep -i 'xds\|eds\|connection'"
+echo ""
+echo "# 6. 测试 ForwardEcho(需要先启动 port-forward)"
+echo "# 在另一个终端执行:"
+echo "kubectl port-forward -n grpc-proxyless \$(kubectl get pod -l
app=producer -n grpc-proxyless -o jsonpath='{.items[0].metadata.name}')
17171:17171"
+echo ""
+echo "# 然后执行测试(注意:-d 参数必须在地址之前!):"
+echo ""
+echo "# 方法1: 直接使用 -d 参数(推荐)"
+echo "grpcurl -plaintext -d '{\"url\":
\"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", \"count\": 5}'
localhost:17171 echo.EchoTestService/ForwardEcho"
+echo ""
+echo "# 方法2: 使用标准输入(如果方法1不行)"
+echo "echo '{\"url\":
\"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", \"count\": 5}' |
grpcurl -plaintext -d @ localhost:17171 echo.EchoTestService/ForwardEcho"
+echo ""
+echo "# 方法3: 使用文件"
+echo "cat > /tmp/request.json << 'JSON'"
+echo "{\"url\": \"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\",
\"count\": 5}"
+echo "JSON"
+echo "grpcurl -plaintext -d @ /tmp/request.json localhost:17171
echo.EchoTestService/ForwardEcho"
+echo ""
+echo "# 7. 查看控制平面日志(检查 EDS 推送)"
+echo "kubectl logs -f -l app=dubbod -n dubbo-system --tail=100 | grep -i
'eds\|endpoint\|consumer'"
+echo ""
diff --git a/test/grpc-proxyless/test.sh b/test/grpc-proxyless/test.sh
new file mode 100755
index 00000000..1c20b2c9
--- /dev/null
+++ b/test/grpc-proxyless/test.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+set -e
+
+NAMESPACE="grpc-proxyless"
+
+echo "=== 测试 gRPC Proxyless 死循环修复 ==="
+echo ""
+
+# 1. 检查控制平面日志(确认死循环已解决)
+echo "1. 检查控制平面日志(观察 10 秒,确认不再出现死循环)..."
+echo " 运行: kubectl logs -f <dubbod-pod-name> -n <namespace> | grep -E
'LDS.*PUSH|resources:'"
+echo " 期望: 不再出现 resources:1 和 resources:13 的快速交替"
+echo ""
+
+# 2. 检查 xDS proxy 日志
+echo "2. 检查 xDS proxy 日志(确认不再频繁转发)..."
+CONSUMER_POD=$(kubectl get pods -n $NAMESPACE -l app=consumer -o
jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
+if [ -n "$CONSUMER_POD" ]; then
+ echo " 检查 pod: $CONSUMER_POD"
+ echo " 运行: kubectl logs $CONSUMER_POD -c dubbo-proxy -n $NAMESPACE |
grep -E 'forwarding request.*LDS' | wc -l"
+ echo " 期望: 数量很少,不再频繁转发"
+else
+ echo " 未找到 consumer pod"
+fi
+echo ""
+
+# 3. 获取服务信息
+echo "3. 获取服务信息..."
+kubectl get svc -n $NAMESPACE
+echo ""
+
+# 4. 使用 grpcurl 测试
+echo "4. 使用 grpcurl 测试服务功能..."
+echo ""
+
+# 测试 ForwardEcho(通过 producer)
+PRODUCER_POD=$(kubectl get pods -n $NAMESPACE -l app=producer -o
jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
+if [ -n "$PRODUCER_POD" ]; then
+ echo " 测试 ForwardEcho(通过 producer pod):"
+ echo " kubectl port-forward -n $NAMESPACE $PRODUCER_POD 17171:17171 &"
+ echo " grpcurl -plaintext localhost:17171
echo.EchoTestService/ForwardEcho -d '{\"url\":
\"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", \"count\": 1,
\"headers\": {}, \"timeout\": 5}'"
+ echo ""
+fi
+
+# 测试 Echo(直接访问 consumer)
+CONSUMER_POD=$(kubectl get pods -n $NAMESPACE -l app=consumer -o
jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
+if [ -n "$CONSUMER_POD" ]; then
+ echo " 测试 Echo(直接访问 consumer pod):"
+ echo " kubectl port-forward -n $NAMESPACE $CONSUMER_POD 17070:17070 &"
+ echo " grpcurl -plaintext localhost:17070 echo.EchoService/Echo -d
'{\"message\": \"test\"}'"
+ echo ""
+fi
+
+echo "=== 测试完成 ==="
+
+