This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new e9fcd78e fix eds endpoint push error (#815)
e9fcd78e is described below

commit e9fcd78e0735a7e04adf9f9efff6c079c475c6a9
Author: Jian Zhong <[email protected]>
AuthorDate: Sun Nov 9 04:38:02 2025 +0800

    fix eds endpoint push error (#815)
---
 README.md                                          |   4 +-
 dubbod/planet/pkg/model/endpointshards.go          |  42 +++-
 .../kube/controller/endpointslice.go               |  66 +++++-
 dubbod/planet/pkg/xds/ads.go                       |  61 ++++--
 dubbod/planet/pkg/xds/eds.go                       |  91 +++++++-
 .../planet/pkg/xds/endpoints/endpoint_builder.go   |  68 +++++-
 dubbod/planet/pkg/xds/xdsgen.go                    | 230 ++++++++++++++++++---
 tests/grpc-proxyless/consumer/main.go              | 114 +++++++++-
 tests/grpc-proxyless/producer/main.go              |  42 +++-
 9 files changed, 629 insertions(+), 89 deletions(-)

diff --git a/README.md b/README.md
index 970edb7e..d8d276dc 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,9 @@
 Dubbo Service Mesh for Kubernetes
 </h1>
 
-[![codecov](https://codecov.io/gh/apache/dubbo-kubernetes/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-kubernetes)
+[![Go Report 
Card](https://goreportcard.com/badge/github.com/apache/dubbo-kubernetes)](https://goreportcard.com/report/github.com/apache/dubbo-kubernetes)
+[![Go 
Reference](https://pkg.go.dev/badge/github.com/apache/dubbo-kubernetes.svg)](https://pkg.go.dev/github.com/apache/dubbo-kubernetes)
+[![Release](https://img.shields.io/github/release/apache/dubbo-kubernetes.svg)](https://github.com/apache/dubbo-kubernetes/releases)
 ![license](https://img.shields.io/badge/license-Apache--2.0-green.svg)
 
 Cloud-Native proxyless service mesh platform for Dubbo microservices, enabling 
zero-latency inter-service communication across multiple protocols, advanced 
traffic management, and enterprise-grade security without sidecar proxies. 
Built on Istio's control plane architecture with native Kubernetes integration, 
it delivers service mesh capabilities with minimal resource overhead and 
maximum performance.
diff --git a/dubbod/planet/pkg/model/endpointshards.go 
b/dubbod/planet/pkg/model/endpointshards.go
index f93d57fb..08850337 100644
--- a/dubbod/planet/pkg/model/endpointshards.go
+++ b/dubbod/planet/pkg/model/endpointshards.go
@@ -61,8 +61,27 @@ func (e *EndpointIndex) clearCacheForService(svc, ns string) 
{
 }
 
 func endpointUpdateRequiresPush(oldDubboEndpoints []*DubboEndpoint, 
incomingEndpoints []*DubboEndpoint) ([]*DubboEndpoint, bool) {
-       if oldDubboEndpoints == nil {
+       // CRITICAL FIX: If old endpoints are empty (nil or empty slice), and 
we have new endpoints, we must push
+       // This ensures that when endpoints become available after being empty, 
clients receive the update
+       // This is critical for proxyless gRPC - when endpoints become 
available, clients must be notified
+       // to prevent "weighted-target: no targets to pick from" errors
+       // Note: len() for nil slices is defined as zero, so we can use len() 
directly
+       oldWasEmpty := len(oldDubboEndpoints) == 0
+       newIsEmpty := len(incomingEndpoints) == 0
+
+       if oldWasEmpty {
                // If there are no old endpoints, we should push with incoming 
endpoints as there is nothing to compare.
+               // CRITICAL: Even if new endpoints are unhealthy, we must push 
to notify clients that endpoints are now available
+               // The client will handle unhealthy endpoints appropriately 
(e.g., retry, circuit breaker)
+               if !newIsEmpty {
+                       return incomingEndpoints, true
+               }
+               // If both old and new are empty, no push needed
+               return incomingEndpoints, false
+       }
+       // CRITICAL FIX: If old endpoints exist but new endpoints are empty, we 
must push
+       // This ensures that when endpoints become unavailable, clients receive 
empty ClusterLoadAssignment
+       if newIsEmpty {
                return incomingEndpoints, true
        }
        needPush := false
@@ -82,6 +101,7 @@ func endpointUpdateRequiresPush(oldDubboEndpoints 
[]*DubboEndpoint, incomingEndp
                        }
                        newDubboEndpoints = append(newDubboEndpoints, nie)
                } else {
+                       // New endpoint that didn't exist before - always push 
if it's healthy or should be sent
                        if nie.HealthStatus != UnHealthy || 
nie.SendUnhealthyEndpoints {
                                needPush = true
                        }
@@ -89,6 +109,7 @@ func endpointUpdateRequiresPush(oldDubboEndpoints 
[]*DubboEndpoint, incomingEndp
                }
        }
        if !needPush {
+               // Check if any old endpoints were removed
                for _, oie := range oldDubboEndpoints {
                        if _, f := nmap[oie.Key()]; !f {
                                needPush = true
@@ -175,8 +196,23 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
        ep.Lock()
        defer ep.Unlock()
        oldDubboEndpoints := ep.Shards[shard]
+       // CRITICAL: Check if this is a transition from empty to non-empty 
BEFORE calling endpointUpdateRequiresPush
+       // This ensures we always push when endpoints become available, even if 
they're unhealthy
+       // Note: len() for nil slices is defined as zero, so we can use len() 
directly
+       oldWasEmpty := len(oldDubboEndpoints) == 0
+       newIsEmpty := len(dubboEndpoints) == 0
+       transitionFromEmptyToNonEmpty := oldWasEmpty && !newIsEmpty
+
        newDubboEndpoints, needPush := 
endpointUpdateRequiresPush(oldDubboEndpoints, dubboEndpoints)
 
+       // CRITICAL FIX: If endpoints transition from empty to non-empty, we 
MUST push
+       // This is essential for proxyless gRPC - clients need to know 
endpoints are now available
+       // even if they're initially unhealthy (client will handle 
retries/circuit breaking)
+       if transitionFromEmptyToNonEmpty {
+               needPush = true
+               log.Debugf("UpdateServiceEndpoints: service=%s, shard=%v, 
endpoints transitioned from empty to non-empty, forcing push", hostname, shard)
+       }
+
        // CRITICAL: Log endpoint update details for debugging
        if logPushType {
                oldHealthyCount := 0
@@ -197,8 +233,8 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
                                newUnhealthyCount++
                        }
                }
-               log.Debugf("UpdateServiceEndpoints: service=%s, shard=%v, 
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d, 
unhealthy=%d), needPush=%v, pushType=%v",
-                       hostname, shard, len(oldDubboEndpoints), 
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount, 
newUnhealthyCount, needPush, pushType)
+               log.Debugf("UpdateServiceEndpoints: service=%s, shard=%v, 
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d, 
unhealthy=%d), needPush=%v, pushType=%v, transitionFromEmptyToNonEmpty=%v",
+                       hostname, shard, len(oldDubboEndpoints), 
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount, 
newUnhealthyCount, needPush, pushType, transitionFromEmptyToNonEmpty)
        }
 
        if pushType != FullPush && !needPush {
diff --git a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go 
b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
index b1a5b022..4ecfff87 100644
--- a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -19,17 +19,18 @@ package controller
 
 import (
        "fmt"
-       "github.com/apache/dubbo-kubernetes/pkg/log"
        "strings"
        "sync"
 
+       "github.com/apache/dubbo-kubernetes/pkg/log"
+
+       "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
-       "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/hashicorp/go-multierror"
        "istio.io/api/annotation"
        corev1 "k8s.io/api/core/v1"
@@ -317,13 +318,60 @@ func (esc *endpointSliceController) 
updateEndpointCacheForSlice(hostName host.Na
                                        }
 
                                        if !matched {
-                                               
log.Debugf("updateEndpointCacheForSlice: failed to match EndpointSlice.Port 
(portNum=%d, portName='%s') with Service %s, using EndpointSlice values",
-                                                       epSlicePortNum, 
epSlicePortName, svcNamespacedName.Name)
-                                               // Fallback: use EndpointSlice 
values
-                                               servicePortNum = epSlicePortNum
-                                               targetPortNum = epSlicePortNum
-                                               if epSlicePortName != "" {
-                                                       portName = 
epSlicePortName
+                                               // CRITICAL FIX: If we can't 
match by Service, try to find ServicePort by port number only
+                                               // This handles cases where 
EndpointSlice.Port.Name doesn't match but port number does
+                                               // This is critical for 
ensuring portName matches Service.Port.Name
+                                               for _, kubePort := range 
kubeSvc.Spec.Ports {
+                                                       if int32(kubePort.Port) 
== epSlicePortNum {
+                                                               // Found by 
port number, use Service.Port.Name
+                                                               servicePortNum 
= int32(kubePort.Port)
+                                                               portName = 
kubePort.Name
+                                                               // Resolve 
targetPortNum
+                                                               if 
kubePort.TargetPort.Type == intstr.Int {
+                                                                       
targetPortNum = kubePort.TargetPort.IntVal
+                                                               } else if 
kubePort.TargetPort.Type == intstr.String && pod != nil {
+                                                                       for _, 
container := range pod.Spec.Containers {
+                                                                               
for _, containerPort := range container.Ports {
+                                                                               
        if containerPort.Name == kubePort.TargetPort.StrVal {
+                                                                               
                targetPortNum = containerPort.ContainerPort
+                                                                               
                break
+                                                                               
        }
+                                                                               
}
+                                                                               
if targetPortNum != 0 {
+                                                                               
        break
+                                                                               
}
+                                                                       }
+                                                               }
+                                                               if 
targetPortNum == 0 {
+                                                                       
targetPortNum = servicePortNum
+                                                               }
+                                                               matched = true
+                                                               
log.Infof("updateEndpointCacheForSlice: matched ServicePort by port number only 
(servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port 
(portNum=%d, portName='%s')",
+                                                                       
servicePortNum, targetPortNum, portName, epSlicePortNum, epSlicePortName)
+                                                               break
+                                                       }
+                                               }
+
+                                               if !matched {
+                                                       
log.Warnf("updateEndpointCacheForSlice: failed to match EndpointSlice.Port 
(portNum=%d, portName='%s') with Service %s, using EndpointSlice values 
(WARNING: portName may not match Service.Port.Name)",
+                                                               epSlicePortNum, 
epSlicePortName, svcNamespacedName.Name)
+                                                       // Fallback: use 
EndpointSlice values
+                                                       // CRITICAL: This 
should rarely happen, but if it does, portName may not match Service.Port.Name
+                                                       // which will cause 
endpoints to be filtered in BuildClusterLoadAssignment
+                                                       servicePortNum = 
epSlicePortNum
+                                                       targetPortNum = 
epSlicePortNum
+                                                       if epSlicePortName != 
"" {
+                                                               portName = 
epSlicePortName
+                                                       } else {
+                                                               // If 
EndpointSlice.Port.Name is also empty, try to get port name from Service by 
port number
+                                                               for _, kubePort 
:= range kubeSvc.Spec.Ports {
+                                                                       if 
int32(kubePort.Port) == epSlicePortNum {
+                                                                               
portName = kubePort.Name
+                                                                               
log.Debugf("updateEndpointCacheForSlice: resolved portName='%s' from Service by 
port number %d", portName, epSlicePortNum)
+                                                                               
break
+                                                                       }
+                                                               }
+                                                       }
                                                }
                                        }
                                } else {
diff --git a/dubbod/planet/pkg/xds/ads.go b/dubbod/planet/pkg/xds/ads.go
index eb66ca30..55a797dc 100644
--- a/dubbod/planet/pkg/xds/ads.go
+++ b/dubbod/planet/pkg/xds/ads.go
@@ -85,13 +85,16 @@ func (s *DiscoveryServer) StartPush(req *model.PushRequest) 
{
 }
 
 func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) {
+       connectedEndpoints := s.adsClientCount()
        if !req.Full {
-               log.Infof("XDS: :%d Version:%s",
-                       s.adsClientCount(), req.Push.PushVersion)
+               // Incremental push - only log connection count and version
+               log.Infof("XDS: Incremental Push to %d Endpoints, Version:%s",
+                       connectedEndpoints, req.Push.PushVersion)
        } else {
+               // Full push - log services, connections, and version
                totalService := len(req.Push.GetAllServices())
                log.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d 
Version:%s",
-                       totalService, s.adsClientCount(), req.Push.PushVersion)
+                       totalService, connectedEndpoints, req.Push.PushVersion)
 
                if req.ConfigsUpdated == nil {
                        req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
@@ -331,19 +334,38 @@ func (s *DiscoveryServer) processRequest(req 
*discovery.DiscoveryRequest, con *C
 
        shouldRespond, delta := xds.ShouldRespond(con.proxy, con.ID(), req)
 
-       if !shouldRespond {
-               // Don't log ACK/ignored/expired nonce requests to reduce noise
-               // These are normal and expected, logging them creates too much 
noise
-               return nil
-       }
-
-       // Log requested resource names only for requests that will be processed
+       // CRITICAL: Log NEW requests (will respond) at INFO level so every 
grpcurl request is visible
+       // This ensures every grpcurl request triggers visible xDS logs in 
control plane
+       // Format: "LDS: REQ node-xxx resources:1 nonce:abc123 [resource1, 
resource2] (will respond)"
        resourceNamesStr := ""
        if len(req.ResourceNames) > 0 {
-               resourceNamesStr = fmt.Sprintf(" [%s]", 
strings.Join(req.ResourceNames, ", "))
+               // Show resource names for better debugging
+               if len(req.ResourceNames) <= 10 {
+                       resourceNamesStr = fmt.Sprintf(" [%s]", 
strings.Join(req.ResourceNames, ", "))
+               } else {
+                       // If too many resources, show first 5 and count
+                       resourceNamesStr = fmt.Sprintf(" [%s, ... and %d 
more]", strings.Join(req.ResourceNames[:5], ", "), len(req.ResourceNames)-5)
+               }
+       } else {
+               resourceNamesStr = " [wildcard]"
+       }
+
+       if shouldRespond {
+               // Log NEW requests at INFO level - these are triggered by 
grpcurl requests
+               // This makes it easy to see when a grpcurl request triggers 
xDS configuration
+               log.Infof("%s: REQ %s resources:%d nonce:%s%s (will respond)", 
stype,
+                       con.ID(), len(req.ResourceNames), req.ResponseNonce, 
resourceNamesStr)
+       } else {
+               // Log ACK/ignored requests at DEBUG level to reduce noise
+               // These are normal XDS protocol ACKs, not new requests from 
grpcurl
+               log.Debugf("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)", 
stype,
+                       con.ID(), len(req.ResourceNames), req.ResponseNonce, 
resourceNamesStr)
+       }
+
+       if !shouldRespond {
+               // Don't process ACK/ignored/expired nonce requests
+               return nil
        }
-       log.Infof("%s: REQ %s resources:%d nonce:%s%s", stype,
-               con.ID(), len(req.ResourceNames), req.ResponseNonce, 
resourceNamesStr)
 
        // CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty 
ResourceNames) after receiving specific resources,
        // this is likely an ACK and we should NOT push all resources again
@@ -359,12 +381,19 @@ func (s *DiscoveryServer) processRequest(req 
*discovery.DiscoveryRequest, con *C
                return nil
        }
 
-       // Log PUSH request BEFORE calling pushXds
+       // Log PUSH request BEFORE calling pushXds with detailed information
+       // This helps track what resources are being requested for each push
+       pushResourceNamesStr := ""
        if len(req.ResourceNames) > 0 {
-               log.Infof("%s: PUSH request for node:%s resources:%d", stype, 
con.ID(), len(req.ResourceNames))
+               if len(req.ResourceNames) <= 10 {
+                       pushResourceNamesStr = fmt.Sprintf(" resources:%d 
[%s]", len(req.ResourceNames), strings.Join(req.ResourceNames, ", "))
+               } else {
+                       pushResourceNamesStr = fmt.Sprintf(" resources:%d [%s, 
... and %d more]", len(req.ResourceNames), strings.Join(req.ResourceNames[:5], 
", "), len(req.ResourceNames)-5)
+               }
        } else {
-               log.Infof("%s: PUSH request for node:%s", stype, con.ID())
+               pushResourceNamesStr = " resources:0 [wildcard]"
        }
+       log.Infof("%s: PUSH request for node:%s%s", stype, con.ID(), 
pushResourceNamesStr)
 
        // Don't set Forced=true for regular proxy requests to avoid 
unnecessary ServiceTargets recomputation
        // Only set Forced for debug requests or when explicitly needed
diff --git a/dubbod/planet/pkg/xds/eds.go b/dubbod/planet/pkg/xds/eds.go
index e9c1665c..0a734f8a 100644
--- a/dubbod/planet/pkg/xds/eds.go
+++ b/dubbod/planet/pkg/xds/eds.go
@@ -47,12 +47,38 @@ func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, 
serviceName string, na
        dubboEndpoints []*model.DubboEndpoint,
 ) {
        pushType := s.Env.EndpointIndex.UpdateServiceEndpoints(shard, 
serviceName, namespace, dubboEndpoints, true)
+       // CRITICAL FIX: Always push EDS updates when endpoints change state
+       // This ensures clients receive updates when:
+       // 1. Endpoints become available (from empty to non-empty)
+       // 2. Endpoints become unavailable (from non-empty to empty)
+       // 3. Endpoint health status changes
+       // This is critical for proxyless gRPC to prevent "weighted-target: no 
targets to pick from" errors
        if pushType == model.IncrementalPush || pushType == model.FullPush {
+               log.Infof("EDSUpdate: service %s/%s triggering %v push 
(endpoints=%d)", namespace, serviceName, pushType, len(dubboEndpoints))
                s.ConfigUpdate(&model.PushRequest{
                        Full:           pushType == model.FullPush,
                        ConfigsUpdated: sets.New(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
                        Reason:         
model.NewReasonStats(model.EndpointUpdate),
                })
+       } else if pushType == model.NoPush {
+               // CRITICAL: Even when UpdateServiceEndpoints returns NoPush, 
we may still need to push
+               // This happens when:
+               // 1. All old endpoints were unhealthy and new endpoints are 
also unhealthy (health status didn't change)
+               // 2. But we still need to notify clients about the current 
state
+               // For proxyless gRPC, we should push even if endpoints are 
empty to ensure clients know the state
+               if len(dubboEndpoints) == 0 {
+                       log.Infof("EDSUpdate: service %s/%s endpoints became 
empty (NoPush), forcing push to clear client cache", namespace, serviceName)
+                       s.ConfigUpdate(&model.PushRequest{
+                               Full:           false, // Incremental push
+                               ConfigsUpdated: sets.New(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
+                               Reason:         
model.NewReasonStats(model.EndpointUpdate),
+                       })
+               } else {
+                       // Endpoints exist but NoPush was returned - this means 
health status didn't change
+                       // For proxyless gRPC, we should still push to ensure 
clients have the latest endpoint info
+                       // This is especially important when endpoints become 
available after being empty
+                       log.Debugf("EDSUpdate: service %s/%s has %d endpoints 
but NoPush returned (health status unchanged), skipping push", namespace, 
serviceName, len(dubboEndpoints))
+               }
        }
 }
 
@@ -153,23 +179,39 @@ func (eds *EdsGenerator) buildEndpoints(proxy 
*model.Proxy,
                        }
                }
 
+               // CRITICAL FIX: For proxyless gRPC, we MUST always process all 
watched clusters
+               // This ensures clients receive EDS updates even when endpoints 
become available or unavailable
+               // For non-proxyless (Istio behavior), we skip clusters that 
are not in edsUpdatedServices for incremental pushes
                if edsUpdatedServices != nil {
                        if _, ok := edsUpdatedServices[hostname]; !ok {
-                               // Cluster was not updated, skip recomputing. 
This happens when we get an incremental update for a
-                               // specific Hostname. On connect or for full 
push edsUpdatedServices will be empty.
+                               // Cluster was not in edsUpdatedServices
                                if !serviceWasUpdated {
-                                       continue
+                                       // CRITICAL: For proxyless gRPC, always 
process all clusters to ensure EDS is up-to-date
+                                       // This is essential because proxyless 
gRPC clients need to know endpoint state changes
+                                       // even if the service wasn't 
explicitly updated in this push
+                                       if proxy.IsProxylessGrpc() {
+                                               log.Debugf("buildEndpoints: 
proxyless gRPC, processing cluster %s even though not in edsUpdatedServices 
(hostname=%s)", clusterName, hostname)
+                                               serviceWasUpdated = true
+                                       } else {
+                                               // For non-proxyless, skip if 
not updated (Istio behavior)
+                                               continue
+                                       }
                                }
                        } else {
                                serviceWasUpdated = true
                        }
+               } else {
+                       // If edsUpdatedServices is nil, this is a full push - 
process all clusters
+                       serviceWasUpdated = true
                }
                builder := endpoints.NewEndpointBuilder(clusterName, proxy, 
req.Push)
                if builder == nil {
                        continue
                }
 
-               // CRITICAL FIX: For incremental pushes (Full=false), we must 
always regenerate EDS
+               // CRITICAL FIX: For proxyless gRPC, we must always regenerate 
EDS when serviceWasUpdated is true
+               // to ensure empty endpoints are sent when they become 
unavailable, and endpoints are sent when available.
+               // For incremental pushes (Full=false), we must always 
regenerate EDS
                // if ConfigsUpdated contains this service, because endpoint 
health status may have changed.
                // The cache may contain stale empty endpoints from when the 
endpoint was unhealthy.
                // For full pushes, we can use cache if the service was not 
updated.
@@ -181,11 +223,19 @@ func (eds *EdsGenerator) buildEndpoints(proxy 
*model.Proxy,
                        for ckey := range req.ConfigsUpdated {
                                if ckey.Kind == kind.ServiceEntry && ckey.Name 
== hostname {
                                        shouldRegenerate = true
+                                       log.Debugf("buildEndpoints: forcing 
regeneration for cluster %s (hostname=%s) due to ConfigsUpdated", clusterName, 
hostname)
                                        break
                                }
                        }
                }
 
+               // CRITICAL: For proxyless gRPC, if serviceWasUpdated is true, 
we must regenerate
+               // even if cache exists, to ensure endpoints are always 
up-to-date
+               // This is critical for preventing "weighted-target: no targets 
to pick from" errors
+               if shouldRegenerate && proxy.IsProxylessGrpc() {
+                       log.Debugf("buildEndpoints: proxyless gRPC, forcing 
regeneration for cluster %s (hostname=%s, serviceWasUpdated=%v)", clusterName, 
hostname, serviceWasUpdated)
+               }
+
                // Try to get from cache only if we don't need to regenerate
                if !shouldRegenerate && eds.Cache != nil {
                        cachedEndpoint := eds.Cache.Get(builder)
@@ -198,22 +248,41 @@ func (eds *EdsGenerator) buildEndpoints(proxy 
*model.Proxy,
 
                // generate eds from beginning
                l := builder.BuildClusterLoadAssignment(eds.EndpointIndex)
-               // CRITICAL FIX: Even if endpoints are empty (l == nil), we 
should still create an empty ClusterLoadAssignment
-               // to ensure the client receives the EDS response. This is 
necessary for proxyless gRPC clients
-               // to know the endpoint state, even if it's empty initially.
-               // Note: BuildClusterLoadAssignment returns nil when no 
endpoints are found,
-               // but we still need to send an empty ClusterLoadAssignment to 
the client.
+               // NOTE: BuildClusterLoadAssignment never returns nil - it 
always returns either:
+               // 1. A valid ClusterLoadAssignment with endpoints, or
+               // 2. An empty ClusterLoadAssignment (via 
buildEmptyClusterLoadAssignment)
+               // The empty ClusterLoadAssignment has an explicit empty 
Endpoints list,
+               // which is critical for proxyless gRPC clients to know the 
endpoint state
+               // and prevent "weighted-target: no targets to pick from" 
errors.
+               // The nil check below is defensive programming, but should 
never be true in practice.
                if l == nil {
-                       // Build an empty ClusterLoadAssignment for this cluster
-                       // Use the same logic as 
endpoint_builder.go:buildEmptyClusterLoadAssignment
+                       // Defensive: Build an empty ClusterLoadAssignment if 
somehow nil is returned
+                       // This should never happen, but ensures we always send 
a valid response
                        l = &endpoint.ClusterLoadAssignment{
                                ClusterName: clusterName,
+                               Endpoints:   []*endpoint.LocalityLbEndpoints{}, 
// Empty endpoints list
                        }
+                       log.Warnf("buildEndpoints: BuildClusterLoadAssignment 
returned nil for cluster %s (hostname: %s), created empty CLA", clusterName, 
hostname)
                }
                regenerated++
 
+               // CRITICAL: Log EDS push details for proxyless gRPC to help 
diagnose "weighted-target: no targets to pick from" errors
                if len(l.Endpoints) == 0 {
                        empty++
+                       if proxy.IsProxylessGrpc() {
+                               log.Infof("buildEndpoints: proxyless gRPC, 
pushing empty EDS for cluster %s (hostname=%s, serviceWasUpdated=%v, 
shouldRegenerate=%v)",
+                                       clusterName, hostname, 
serviceWasUpdated, shouldRegenerate)
+                       }
+               } else {
+                       // Count total endpoints in the ClusterLoadAssignment
+                       totalEndpointsInCLA := 0
+                       for _, localityLbEp := range l.Endpoints {
+                               totalEndpointsInCLA += 
len(localityLbEp.LbEndpoints)
+                       }
+                       if proxy.IsProxylessGrpc() {
+                               log.Infof("buildEndpoints: proxyless gRPC, 
pushing EDS for cluster %s (hostname=%s, endpoints=%d, serviceWasUpdated=%v, 
shouldRegenerate=%v)",
+                                       clusterName, hostname, 
totalEndpointsInCLA, serviceWasUpdated, shouldRegenerate)
+                       }
                }
                resource := &discovery.Resource{
                        Name:     l.ClusterName,
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go 
b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index e21f2f09..37878bb4 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -18,6 +18,8 @@
 package endpoints
 
 import (
+       "fmt"
+
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/util"
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
@@ -85,7 +87,9 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
        // Get endpoints from the endpoint index
        shards, ok := endpointIndex.ShardsForService(string(b.hostname), 
b.service.Attributes.Namespace)
        if !ok {
-               log.Warnf("BuildClusterLoadAssignment: no shards found for 
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s', 
svcPort.Port=%d)",
+               // CRITICAL: Log at INFO level for proxyless gRPC to help 
diagnose "weighted-target: no targets to pick from" errors
+               log.Infof("BuildClusterLoadAssignment: no shards found for 
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s', 
svcPort.Port=%d). "+
+                       "This usually means endpoints are not yet available or 
service is not registered.",
                        b.hostname, b.service.Attributes.Namespace, 
b.clusterName, b.port, svcPort.Name, svcPort.Port)
                return buildEmptyClusterLoadAssignment(b.clusterName)
        }
@@ -119,16 +123,46 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
        var lbEndpoints []*endpoint.LbEndpoint
        var filteredCount int
        var totalEndpoints int
+       var portNameMismatchCount int
+       var unhealthyCount int
+       var buildFailedCount int
+
+       // CRITICAL: Log all endpoint ServicePortNames for debugging port name 
matching issues
+       allServicePortNames := make(map[string]int)
+       for _, eps := range shards.Shards {
+               for _, ep := range eps {
+                       allServicePortNames[ep.ServicePortName]++
+               }
+       }
+       if len(allServicePortNames) > 0 {
+               portNamesList := make([]string, 0, len(allServicePortNames))
+               for name, count := range allServicePortNames {
+                       portNamesList = append(portNamesList, 
fmt.Sprintf("'%s'(%d)", name, count))
+               }
+               log.Infof("BuildClusterLoadAssignment: service %s port %d 
(svcPort.Name='%s'), found endpoints with ServicePortNames: %v",
+                       b.hostname, b.port, svcPort.Name, portNamesList)
+       }
 
        for _, eps := range shards.Shards {
                for _, ep := range eps {
                        totalEndpoints++
                        // Filter by port name
+                       // CRITICAL: According to Istio's implementation, we 
must match ServicePortName exactly
+                       // However, if ServicePortName is empty, we should 
still include the endpoint if there's only one port
+                       // This handles cases where EndpointSlice doesn't have 
port name but Service does
                        if ep.ServicePortName != svcPort.Name {
-                               filteredCount++
-                               log.Debugf("BuildClusterLoadAssignment: 
filtering out endpoint %s (port name mismatch: '%s' != '%s')",
-                                       ep.FirstAddressOrNil(), 
ep.ServicePortName, svcPort.Name)
-                               continue
+                               // Special case: if ServicePortName is empty 
and service has only one port, include it
+                               if ep.ServicePortName == "" && 
len(b.service.Ports) == 1 {
+                                       log.Debugf("BuildClusterLoadAssignment: 
including endpoint %s with empty ServicePortName (service has only one port 
'%s')",
+                                               ep.FirstAddressOrNil(), 
svcPort.Name)
+                                       // Continue processing this endpoint
+                               } else {
+                                       portNameMismatchCount++
+                                       filteredCount++
+                                       log.Warnf("BuildClusterLoadAssignment: 
filtering out endpoint %s (port name mismatch: ep.ServicePortName='%s' != 
svcPort.Name='%s', EndpointPort=%d)",
+                                               ep.FirstAddressOrNil(), 
ep.ServicePortName, svcPort.Name, ep.EndpointPort)
+                                       continue
+                               }
                        }
 
                        // Filter by subset labels if subset is specified
@@ -145,16 +179,19 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
                        // as the client will handle connection failures. 
However, this is controlled by the service
                        // configuration (publishNotReadyAddresses).
                        if !b.service.SupportsUnhealthyEndpoints() && 
ep.HealthStatus == model.UnHealthy {
-                               log.Debugf("BuildClusterLoadAssignment: 
filtering out unhealthy endpoint %s", ep.FirstAddressOrNil())
+                               unhealthyCount++
                                filteredCount++
+                               log.Debugf("BuildClusterLoadAssignment: 
filtering out unhealthy endpoint %s (HealthStatus=%v, 
SupportsUnhealthyEndpoints=%v)",
+                                       ep.FirstAddressOrNil(), 
ep.HealthStatus, b.service.SupportsUnhealthyEndpoints())
                                continue
                        }
 
                        // Build LbEndpoint
                        lbEp := b.buildLbEndpoint(ep)
                        if lbEp == nil {
-                               log.Debugf("BuildClusterLoadAssignment: 
buildLbEndpoint returned nil for endpoint %s", ep.FirstAddressOrNil())
+                               buildFailedCount++
                                filteredCount++
+                               log.Debugf("BuildClusterLoadAssignment: 
buildLbEndpoint returned nil for endpoint %s", ep.FirstAddressOrNil())
                                continue
                        }
                        lbEndpoints = append(lbEndpoints, lbEp)
@@ -162,8 +199,17 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
        }
 
        if len(lbEndpoints) == 0 {
-               log.Debugf("BuildClusterLoadAssignment: no endpoints found for 
cluster %s (hostname=%s, port=%d, totalEndpoints=%d, filteredCount=%d)",
-                       b.clusterName, b.hostname, b.port, totalEndpoints, 
filteredCount)
+               // CRITICAL: Log at WARN level for proxyless gRPC to help 
diagnose "weighted-target: no targets to pick from" errors
+               logLevel := log.Debugf
+               // For proxyless gRPC, log empty endpoints at INFO level to 
help diagnose connection issues
+               // This helps identify when endpoints are not available vs when 
they're filtered out
+               if totalEndpoints > 0 {
+                       logLevel = log.Warnf // If endpoints exist but were 
filtered, this is a warning
+               } else {
+                       logLevel = log.Infof // If no endpoints exist at all, 
this is informational
+               }
+               logLevel("BuildClusterLoadAssignment: no endpoints found for 
cluster %s (hostname=%s, port=%d, svcPort.Name='%s', svcPort.Port=%d, 
totalEndpoints=%d, filteredCount=%d, portNameMismatch=%d, unhealthy=%d, 
buildFailed=%d)",
+                       b.clusterName, b.hostname, b.port, svcPort.Name, 
svcPort.Port, totalEndpoints, filteredCount, portNameMismatchCount, 
unhealthyCount, buildFailedCount)
                return buildEmptyClusterLoadAssignment(b.clusterName)
        }
 
@@ -243,8 +289,12 @@ func (b *EndpointBuilder) buildLbEndpoint(ep 
*model.DubboEndpoint) *endpoint.LbE
 }
 
 func buildEmptyClusterLoadAssignment(clusterName string) 
*endpoint.ClusterLoadAssignment {
+       // CRITICAL FIX: Following Istio's pattern, empty ClusterLoadAssignment 
should have empty Endpoints list
+       // This ensures gRPC proxyless clients receive the update and clear 
their endpoint cache,
+       // preventing "weighted-target: no targets to pick from" errors
        return &endpoint.ClusterLoadAssignment{
                ClusterName: clusterName,
+               Endpoints:   []*endpoint.LocalityLbEndpoints{}, // Explicitly 
empty, not nil
        }
 }
 
diff --git a/dubbod/planet/pkg/xds/xdsgen.go b/dubbod/planet/pkg/xds/xdsgen.go
index 58971e8f..5ba03709 100644
--- a/dubbod/planet/pkg/xds/xdsgen.go
+++ b/dubbod/planet/pkg/xds/xdsgen.go
@@ -34,6 +34,8 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/xds"
        cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+       listener 
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+       hcmv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
 )
 
@@ -160,6 +162,36 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                                                }
                                        }
                                }
+                       } else if w.TypeUrl == v3.RouteType {
+                               // CRITICAL FIX: Extract route names from LDS 
response for RDS wildcard requests
+                               // RDS is not a wildcard type, so when client 
sends empty ResourceNames,
+                               // we need to extract route names from LDS 
listeners that reference RDS
+                               ldsWatched := 
con.proxy.GetWatchedResource(v3.ListenerType)
+                               if ldsWatched != nil && ldsWatched.NonceSent != 
"" {
+                                       // LDS has been sent, extract route 
names from it
+                                       log.Debugf("pushXds: RDS wildcard 
request, extracting route names from LDS")
+                                       // Create a temporary request to 
generate LDS and extract route names
+                                       ldsReq := &model.PushRequest{
+                                               Full:   true,
+                                               Push:   req.Push,
+                                               Reason: 
model.NewReasonStats(model.DependentResource),
+                                               Start:  con.proxy.LastPushTime,
+                                               Forced: false,
+                                       }
+                                       ldsGen := 
s.findGenerator(v3.ListenerType, con)
+                                       if ldsGen != nil {
+                                               ldsRes, _, _ := 
ldsGen.Generate(con.proxy, ldsWatched, ldsReq)
+                                               if len(ldsRes) > 0 {
+                                                       routeNames := 
extractRouteNamesFromLDS(ldsRes)
+                                                       if len(routeNames) > 0 {
+                                                               w.ResourceNames 
= sets.New(routeNames...)
+                                                               
requestedResourceNames = sets.New[string]()
+                                                               
requestedResourceNames.InsertAll(routeNames...)
+                                                               
log.Debugf("pushXds: extracted %d route names from LDS: %v", len(routeNames), 
routeNames)
+                                                       }
+                                               }
+                                       }
+                               }
                        }
                } else if len(w.ResourceNames) > 0 {
                        // Specific resource request
@@ -317,29 +349,44 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
        case !req.Full:
        default:
                // Log format matches Istio: "LDS: PUSH for node:xxx 
resources:1 size:342B"
+               // CRITICAL: Always log resource names for better debugging, 
especially for proxyless gRPC
                resourceNamesStr := ""
-               if len(res) > 0 && len(res) <= 5 {
-                       // Log resource names if there are few resources (for 
debugging)
-                       names := make([]string, 0, len(res))
-                       for _, r := range res {
-                               names = append(names, r.Name)
+               if len(res) > 0 {
+                       if len(res) <= 10 {
+                               // Log all resource names if there are few 
resources
+                               names := make([]string, 0, len(res))
+                               for _, r := range res {
+                                       names = append(names, r.Name)
+                               }
+                               resourceNamesStr = fmt.Sprintf(" [%s]", 
strings.Join(names, ", "))
+                       } else {
+                               // If too many resources, show first 5 and count
+                               names := make([]string, 0, 5)
+                               for i := 0; i < 5 && i < len(res); i++ {
+                                       names = append(names, res[i].Name)
+                               }
+                               resourceNamesStr = fmt.Sprintf(" [%s, ... and 
%d more]", strings.Join(names, ", "), len(res)-5)
                        }
-                       resourceNamesStr = fmt.Sprintf(" [%s]", 
strings.Join(names, ", "))
+               } else {
+                       resourceNamesStr = " [empty]"
                }
                log.Infof("%s: %s for node:%s resources:%d size:%s%s%s", 
v3.GetShortType(w.TypeUrl), ptype, con.proxy.ID, len(res),
                        util.ByteCount(ResourceSize(res)), info, 
resourceNamesStr)
        }
 
        // CRITICAL FIX: For proxyless gRPC, after pushing LDS with outbound 
listeners,
-       // automatically trigger CDS push for the referenced clusters ONLY if 
this is a direct request push
-       // (not a push from pushConnection which would cause loops)
-       // Only auto-push if CDS is not already being watched by the client 
(client will request it naturally)
+       // automatically trigger CDS and RDS push for the referenced clusters 
and routes
+       // ONLY if this is a direct request push (not a push from 
pushConnection which would cause loops)
+       // Only auto-push if CDS/RDS is not already being watched by the client 
(client will request it naturally)
        if w.TypeUrl == v3.ListenerType && con.proxy.IsProxylessGrpc() && 
len(res) > 0 {
-               // Only auto-push CDS if this is a direct request (not a full 
push from pushConnection)
+               // Only auto-push CDS/RDS if this is a direct request (not a 
full push from pushConnection)
                // Check if this push was triggered by a direct client request 
using IsRequest()
                isDirectRequest := req.IsRequest()
                if isDirectRequest {
                        clusterNames := extractClusterNamesFromLDS(res)
+                       routeNames := extractRouteNamesFromLDS(res)
+
+                       // Auto-push CDS for referenced clusters
                        if len(clusterNames) > 0 {
                                cdsWatched := 
con.proxy.GetWatchedResource(v3.ClusterType)
                                // Only auto-push CDS if client hasn't already 
requested it
@@ -365,23 +412,95 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                                        log.Debugf("pushXds: LDS push 
completed, client already requested CDS, skipping auto-push")
                                }
                        }
+
+                       // CRITICAL FIX: Auto-push RDS for referenced routes
+                       // This is essential for gRPC proxyless - client needs 
RDS to route traffic correctly
+                       if len(routeNames) > 0 {
+                               rdsWatched := 
con.proxy.GetWatchedResource(v3.RouteType)
+                               // Only auto-push RDS if client hasn't already 
requested it
+                               if rdsWatched == nil || 
rdsWatched.ResourceNames == nil || len(rdsWatched.ResourceNames) == 0 {
+                                       // Client hasn't requested RDS yet, 
auto-push to ensure client gets the route config
+                                       
con.proxy.NewWatchedResource(v3.RouteType, routeNames)
+                                       log.Debugf("pushXds: LDS push 
completed, auto-pushing RDS for routes: %v", routeNames)
+                                       // Trigger RDS push directly without 
going through pushConnection to avoid loops
+                                       rdsReq := &model.PushRequest{
+                                               Full:   true,
+                                               Push:   req.Push,
+                                               Reason: 
model.NewReasonStats(model.ProxyRequest),
+                                               Start:  con.proxy.LastPushTime,
+                                               Forced: false,
+                                       }
+                                       if err := s.pushXds(con, 
con.proxy.GetWatchedResource(v3.RouteType), rdsReq); err != nil {
+                                               log.Warnf("pushXds: failed to 
push RDS after LDS: %v", err)
+                                       }
+                               } else {
+                                       // Check if any route names are missing 
from the watched set
+                                       existingNames := 
rdsWatched.ResourceNames
+                                       if existingNames == nil {
+                                               existingNames = 
sets.New[string]()
+                                       }
+                                       hasNewRoutes := false
+                                       for _, rn := range routeNames {
+                                               if !existingNames.Contains(rn) {
+                                                       hasNewRoutes = true
+                                                       break
+                                               }
+                                       }
+                                       if hasNewRoutes {
+                                               // Update RDS watched resource 
to include the new route names
+                                               
con.proxy.UpdateWatchedResource(v3.RouteType, func(wr *model.WatchedResource) 
*model.WatchedResource {
+                                                       if wr == nil {
+                                                               wr = 
&model.WatchedResource{TypeUrl: v3.RouteType, ResourceNames: sets.New[string]()}
+                                                       }
+                                                       existingNames := 
wr.ResourceNames
+                                                       if existingNames == nil 
{
+                                                               existingNames = 
sets.New[string]()
+                                                       }
+                                                       for _, rn := range 
routeNames {
+                                                               
existingNames.Insert(rn)
+                                                       }
+                                                       wr.ResourceNames = 
existingNames
+                                                       return wr
+                                               })
+                                               log.Debugf("pushXds: LDS push 
completed, updating RDS watched resource with new routes: %v", routeNames)
+                                               // Trigger RDS push for the new 
routes
+                                               rdsReq := &model.PushRequest{
+                                                       Full:   true,
+                                                       Push:   req.Push,
+                                                       Reason: 
model.NewReasonStats(model.DependentResource),
+                                                       Start:  
con.proxy.LastPushTime,
+                                                       Forced: false,
+                                               }
+                                               if err := s.pushXds(con, 
con.proxy.GetWatchedResource(v3.RouteType), rdsReq); err != nil {
+                                                       log.Warnf("pushXds: 
failed to push RDS after LDS: %v", err)
+                                               }
+                                       } else {
+                                               log.Debugf("pushXds: LDS push 
completed, RDS routes already watched: %v", routeNames)
+                                       }
+                               }
+                       }
                }
        }
 
        // CRITICAL FIX: For proxyless gRPC, after pushing CDS with EDS 
clusters,
-       // automatically trigger EDS push for the referenced endpoints
-       // This is necessary for load balancing - client needs EDS to discover 
all available endpoints
+       // we should NOT automatically push EDS before the client requests it.
+       // The gRPC xDS client will automatically request EDS after receiving 
CDS with EDS clusters.
+       // If we push EDS before the client requests it, the client will 
receive the response
+       // without having a state, causing "no state exists for it" warnings 
and "weighted-target: no targets to pick from" errors.
+       // Instead, we should:
+       // 1. Update the watched resource to include the EDS cluster names (so 
when client requests EDS, we know what to send)
+       // 2. Wait for the client to request EDS naturally
+       // 3. When the client requests EDS, we will push it with the correct 
state
        if w.TypeUrl == v3.ClusterType && con.proxy.IsProxylessGrpc() && 
len(res) > 0 {
                // Extract EDS cluster names from CDS resources
                edsClusterNames := extractEDSClusterNamesFromCDS(res)
                if len(edsClusterNames) > 0 {
                        edsWatched := 
con.proxy.GetWatchedResource(v3.EndpointType)
-                       shouldPushEDS := false
                        if edsWatched == nil {
-                               // EDS not watched yet, create watched resource
+                               // EDS not watched yet, create watched resource 
with cluster names
+                               // This ensures that when the client requests 
EDS, we know which clusters to send
                                con.proxy.NewWatchedResource(v3.EndpointType, 
edsClusterNames)
-                               shouldPushEDS = true
-                               log.Debugf("pushXds: CDS push completed, 
auto-pushing EDS for clusters: %v", edsClusterNames)
+                               log.Debugf("pushXds: CDS push completed, 
created EDS watched resource for clusters: %v (waiting for client request)", 
edsClusterNames)
                        } else {
                                // Check if any cluster names are missing from 
the watched set
                                existingNames := edsWatched.ResourceNames
@@ -411,27 +530,14 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                                                wr.ResourceNames = existingNames
                                                return wr
                                        })
-                                       shouldPushEDS = true
-                                       log.Debugf("pushXds: CDS push 
completed, updating EDS watched resource with new clusters: %v", 
edsClusterNames)
+                                       log.Debugf("pushXds: CDS push 
completed, updated EDS watched resource with new clusters: %v (waiting for 
client request)", edsClusterNames)
                                } else {
                                        log.Debugf("pushXds: CDS push 
completed, EDS clusters already watched: %v", edsClusterNames)
                                }
                        }
-
-                       // Only push EDS if we have new clusters to push
-                       if shouldPushEDS {
-                               // Trigger EDS push directly without going 
through pushConnection to avoid loops
-                               edsReq := &model.PushRequest{
-                                       Full:   true,
-                                       Push:   req.Push,
-                                       Reason: 
model.NewReasonStats(model.DependentResource),
-                                       Start:  con.proxy.LastPushTime,
-                                       Forced: true, // Force EDS push to 
ensure endpoints are available for load balancing
-                               }
-                               if err := s.pushXds(con, 
con.proxy.GetWatchedResource(v3.EndpointType), edsReq); err != nil {
-                                       log.Warnf("pushXds: failed to push EDS 
after CDS: %v", err)
-                               }
-                       }
+                       // CRITICAL: Do NOT push EDS here - wait for the client 
to request it naturally
+                       // The gRPC xDS client will automatically request EDS 
after receiving CDS with EDS clusters
+                       // Pushing EDS before the client requests it causes "no 
state exists" warnings
                }
        }
 
@@ -487,6 +593,64 @@ func extractClusterNamesFromLDS(listeners model.Resources) 
[]string {
        return clusterNames.UnsortedList()
 }
 
+// extractRouteNamesFromLDS extracts route names referenced in LDS listener 
resources
+// For outbound listeners with ApiListener, the route name is the 
RouteConfigName from Rds config
+// Route name format is the same as cluster name: "outbound|port||hostname"
+func extractRouteNamesFromLDS(listeners model.Resources) []string {
+       routeNames := sets.New[string]()
+       for _, r := range listeners {
+               // Unmarshal the listener resource to extract route names
+               ll := &listener.Listener{}
+               if err := r.Resource.UnmarshalTo(ll); err != nil {
+                       log.Debugf("extractRouteNamesFromLDS: failed to 
unmarshal listener %s: %v", r.Name, err)
+                       continue
+               }
+
+               // Check if this listener has ApiListener (used by gRPC 
proxyless for outbound)
+               if ll.ApiListener != nil && ll.ApiListener.ApiListener != nil {
+                       // Unmarshal ApiListener to get HttpConnectionManager
+                       hcm := &hcmv3.HttpConnectionManager{}
+                       if err := ll.ApiListener.ApiListener.UnmarshalTo(hcm); 
err != nil {
+                               log.Debugf("extractRouteNamesFromLDS: failed to 
unmarshal ApiListener for listener %s: %v", r.Name, err)
+                               continue
+                       }
+
+                       // Check if HttpConnectionManager uses RDS
+                       if hcm.RouteSpecifier != nil {
+                               if rds, ok := 
hcm.RouteSpecifier.(*hcmv3.HttpConnectionManager_Rds); ok && rds.Rds != nil {
+                                       // Found RDS reference, extract route 
name
+                                       routeName := rds.Rds.RouteConfigName
+                                       if routeName != "" {
+                                               routeNames.Insert(routeName)
+                                               
log.Debugf("extractRouteNamesFromLDS: found route name %s from listener %s", 
routeName, r.Name)
+                                       }
+                               }
+                       }
+               }
+
+               // Fallback: If ApiListener extraction failed, try to extract 
from listener name
+               // This is a backup method in case the listener structure is 
different
+               // Outbound listener format: "hostname:port" -> route: 
"outbound|port||hostname"
+               listenerName := r.Name
+               if strings.Contains(listenerName, ":") && 
!strings.HasPrefix(listenerName, "xds.dubbo.apache.org/grpc/lds/inbound/") {
+                       // This is an outbound listener
+                       parts := strings.Split(listenerName, ":")
+                       if len(parts) == 2 {
+                               hostname := parts[0]
+                               portStr := parts[1]
+                               port, err := strconv.Atoi(portStr)
+                               if err == nil {
+                                       // Build route name: 
outbound|port||hostname (same format as cluster name)
+                                       routeName := 
model.BuildSubsetKey(model.TrafficDirectionOutbound, "", host.Name(hostname), 
port)
+                                       routeNames.Insert(routeName)
+                                       log.Debugf("extractRouteNamesFromLDS: 
extracted route name %s from listener name %s", routeName, listenerName)
+                               }
+                       }
+               }
+       }
+       return routeNames.UnsortedList()
+}
+
 func ResourceSize(r model.Resources) int {
        size := 0
        for _, r := range r {
diff --git a/tests/grpc-proxyless/consumer/main.go 
b/tests/grpc-proxyless/consumer/main.go
index 5e3e665d..4b8476ca 100644
--- a/tests/grpc-proxyless/consumer/main.go
+++ b/tests/grpc-proxyless/consumer/main.go
@@ -25,8 +25,10 @@ import (
        "net"
        "os"
        "os/signal"
+       "regexp"
        "strings"
        "syscall"
+       "time"
 
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
@@ -110,12 +112,39 @@ type grpcLogger struct {
        logger *log.Logger
 }
 
+var (
+       // Regex to match gRPC formatting errors like %!p(...)
+       formatErrorRegex = regexp.MustCompile(`%!p\([^)]+\)`)
+)
+
+// cleanMessage removes formatting errors from gRPC logs
+// Fixes issues like: 
"\u003c%!p(networktype.keyType=grpc.internal.transport.networktype)\u003e": 
"unix"
+func cleanMessage(msg string) string {
+       // Replace %!p(...) patterns with a cleaner representation
+       msg = formatErrorRegex.ReplaceAllStringFunc(msg, func(match string) 
string {
+               // Extract the key from %!p(networktype.keyType=...)
+               if strings.Contains(match, "networktype.keyType") {
+                       return `"networktype"`
+               }
+               // For other cases, just remove the error pattern
+               return ""
+       })
+       // Also clean up Unicode escape sequences that appear with formatting 
errors
+       // Replace \u003c (which is <) and \u003e (which is >) when they appear 
with formatting errors
+       msg = strings.ReplaceAll(msg, `\u003c`, "<")
+       msg = strings.ReplaceAll(msg, `\u003e`, ">")
+       // Clean up patterns like <...>: "unix" to just show the value
+       msg = regexp.MustCompile(`<[^>]*>:\s*"unix"`).ReplaceAllString(msg, 
`"networktype": "unix"`)
+       return msg
+}
+
 func (l *grpcLogger) Info(args ...interface{}) {
        msg := fmt.Sprint(args...)
        // Filter out xDS "entering mode: SERVING" logs
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Print("INFO: ", msg)
 }
 
@@ -124,6 +153,7 @@ func (l *grpcLogger) Infoln(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Print("INFO: ", msg)
 }
 
@@ -132,19 +162,23 @@ func (l *grpcLogger) Infof(format string, args 
...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Printf("INFO: %s", msg)
 }
 
 func (l *grpcLogger) Warning(args ...interface{}) {
-       l.logger.Print("WARNING: ", fmt.Sprint(args...))
+       msg := cleanMessage(fmt.Sprint(args...))
+       l.logger.Print("WARNING: ", msg)
 }
 
 func (l *grpcLogger) Warningln(args ...interface{}) {
-       l.logger.Print("WARNING: ", fmt.Sprintln(args...))
+       msg := cleanMessage(fmt.Sprintln(args...))
+       l.logger.Print("WARNING: ", msg)
 }
 
 func (l *grpcLogger) Warningf(format string, args ...interface{}) {
-       l.logger.Printf("WARNING: %s", fmt.Sprintf(format, args...))
+       msg := cleanMessage(fmt.Sprintf(format, args...))
+       l.logger.Printf("WARNING: %s", msg)
 }
 
 func (l *grpcLogger) Error(args ...interface{}) {
@@ -153,6 +187,15 @@ func (l *grpcLogger) Error(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       // Filter out common connection reset errors - these are normal network 
behavior
+       // when clients disconnect before completing the HTTP/2 handshake
+       if strings.Contains(msg, "connection reset by peer") ||
+               strings.Contains(msg, "failed to receive the preface from 
client") ||
+               strings.Contains(msg, "connection error") {
+               // These are normal network events, log at DEBUG level instead 
of ERROR
+               return
+       }
+       msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
 
@@ -161,6 +204,13 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       // Filter out common connection reset errors - these are normal network 
behavior
+       if strings.Contains(msg, "connection reset by peer") ||
+               strings.Contains(msg, "failed to receive the preface from 
client") ||
+               strings.Contains(msg, "connection error") {
+               return
+       }
+       msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
 
@@ -169,6 +219,13 @@ func (l *grpcLogger) Errorf(format string, args 
...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       // Filter out common connection reset errors - these are normal network 
behavior
+       if strings.Contains(msg, "connection reset by peer") ||
+               strings.Contains(msg, "failed to receive the preface from 
client") ||
+               strings.Contains(msg, "connection error") {
+               return
+       }
+       msg = cleanMessage(msg)
        l.logger.Printf("ERROR: %s", msg)
 }
 
@@ -188,6 +245,35 @@ func (l *grpcLogger) V(level int) bool {
        return level <= 0
 }
 
+// waitForBootstrapFile waits for the grpc-bootstrap.json file to exist
+// This is necessary because the dubbo-proxy sidecar needs time to generate 
the file
+func waitForBootstrapFile(bootstrapPath string, maxWait time.Duration) error {
+       log.Printf("Waiting for bootstrap file to exist: %s (max wait: %v)", 
bootstrapPath, maxWait)
+
+       ctx, cancel := context.WithTimeout(context.Background(), maxWait)
+       defer cancel()
+
+       ticker := time.NewTicker(500 * time.Millisecond)
+       defer ticker.Stop()
+
+       startTime := time.Now()
+       for {
+               // Check if file exists and is not empty
+               if info, err := os.Stat(bootstrapPath); err == nil && 
info.Size() > 0 {
+                       log.Printf("Bootstrap file found after %v: %s", 
time.Since(startTime), bootstrapPath)
+                       return nil
+               }
+
+               // Check for timeout
+               select {
+               case <-ctx.Done():
+                       return fmt.Errorf("timeout waiting for bootstrap file: 
%s (waited %v)", bootstrapPath, time.Since(startTime))
+               case <-ticker.C:
+                       // Continue waiting
+               }
+       }
+}
+
 func main() {
        flag.Parse()
 
@@ -202,6 +288,19 @@ func main() {
                hostname = "unknown"
        }
 
+       // Get bootstrap file path from environment variable or use default
+       bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
+       if bootstrapPath == "" {
+               bootstrapPath = "/etc/dubbo/proxy/grpc-bootstrap.json"
+               log.Printf("GRPC_XDS_BOOTSTRAP not set, using default: %s", 
bootstrapPath)
+       }
+
+       // Wait for bootstrap file to exist before creating xDS server
+       // The dubbo-proxy sidecar needs time to generate this file
+       if err := waitForBootstrapFile(bootstrapPath, 60*time.Second); err != 
nil {
+               log.Fatalf("Failed to wait for bootstrap file: %v", err)
+       }
+
        // Create xDS-enabled gRPC server
        // For proxyless gRPC, we use xds.NewGRPCServer() instead of 
grpc.NewServer()
        creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
@@ -237,7 +336,14 @@ func main() {
                server.GracefulStop()
        }()
 
+       // Serve the gRPC server
+       // Note: server.Serve returns when the listener is closed, which is 
normal during shutdown
+       // Connection reset errors are handled by the gRPC library and logged 
separately
        if err := server.Serve(lis); err != nil {
-               log.Fatalf("Failed to serve: %v", err)
+               // Only log as fatal if it's not a normal shutdown (listener 
closed)
+               if !strings.Contains(err.Error(), "use of closed network 
connection") {
+                       log.Fatalf("Failed to serve: %v", err)
+               }
+               log.Printf("Server stopped: %v", err)
        }
 }
diff --git a/tests/grpc-proxyless/producer/main.go 
b/tests/grpc-proxyless/producer/main.go
index acc17116..d4952140 100644
--- a/tests/grpc-proxyless/producer/main.go
+++ b/tests/grpc-proxyless/producer/main.go
@@ -26,6 +26,7 @@ import (
        "net"
        "os"
        "os/signal"
+       "regexp"
        "strings"
        "sync"
        "syscall"
@@ -54,11 +55,38 @@ type grpcLogger struct {
        logger *log.Logger
 }
 
+var (
+       // Regex to match gRPC formatting errors like %!p(...)
+       formatErrorRegex = regexp.MustCompile(`%!p\([^)]+\)`)
+)
+
+// cleanMessage removes formatting errors from gRPC logs
+// Fixes issues like: 
"\u003c%!p(networktype.keyType=grpc.internal.transport.networktype)\u003e": 
"unix"
+func cleanMessage(msg string) string {
+       // Replace %!p(...) patterns with a cleaner representation
+       msg = formatErrorRegex.ReplaceAllStringFunc(msg, func(match string) 
string {
+               // Extract the key from %!p(networktype.keyType=...)
+               if strings.Contains(match, "networktype.keyType") {
+                       return `"networktype"`
+               }
+               // For other cases, just remove the error pattern
+               return ""
+       })
+       // Also clean up Unicode escape sequences that appear with formatting 
errors
+       // Replace \u003c (which is <) and \u003e (which is >) when they appear 
with formatting errors
+       msg = strings.ReplaceAll(msg, `\u003c`, "<")
+       msg = strings.ReplaceAll(msg, `\u003e`, ">")
+       // Clean up patterns like <...>: "unix" to just show the value
+       msg = regexp.MustCompile(`<[^>]*>:\s*"unix"`).ReplaceAllString(msg, 
`"networktype": "unix"`)
+       return msg
+}
+
 func (l *grpcLogger) Info(args ...interface{}) {
        msg := fmt.Sprint(args...)
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Print("INFO: ", msg)
 }
 
@@ -67,6 +95,7 @@ func (l *grpcLogger) Infoln(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Print("INFO: ", msg)
 }
 
@@ -75,19 +104,23 @@ func (l *grpcLogger) Infof(format string, args 
...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Printf("INFO: %s", msg)
 }
 
 func (l *grpcLogger) Warning(args ...interface{}) {
-       l.logger.Print("WARNING: ", fmt.Sprint(args...))
+       msg := cleanMessage(fmt.Sprint(args...))
+       l.logger.Print("WARNING: ", msg)
 }
 
 func (l *grpcLogger) Warningln(args ...interface{}) {
-       l.logger.Print("WARNING: ", fmt.Sprintln(args...))
+       msg := cleanMessage(fmt.Sprintln(args...))
+       l.logger.Print("WARNING: ", msg)
 }
 
 func (l *grpcLogger) Warningf(format string, args ...interface{}) {
-       l.logger.Printf("WARNING: %s", fmt.Sprintf(format, args...))
+       msg := cleanMessage(fmt.Sprintf(format, args...))
+       l.logger.Printf("WARNING: %s", msg)
 }
 
 func (l *grpcLogger) Error(args ...interface{}) {
@@ -95,6 +128,7 @@ func (l *grpcLogger) Error(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
 
@@ -103,6 +137,7 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
 
@@ -111,6 +146,7 @@ func (l *grpcLogger) Errorf(format string, args 
...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       msg = cleanMessage(msg)
        l.logger.Printf("ERROR: %s", msg)
 }
 

Reply via email to