This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5dd3ed fix: eds unable to retrieve unhealthy endpoints (#819)
ac5dd3ed is described below

commit ac5dd3ede51854a4ccc08e4eda5985385ac39bfa
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Nov 11 19:25:02 2025 +0800

    fix: eds unable to retrieve unhealthy endpoints (#819)
---
 .github/workflows/release.yaml                     |   2 +-
 dubbod/planet/pkg/model/endpointshards.go          |  24 +++++
 dubbod/planet/pkg/networking/grpcgen/cds.go        |  33 ++++---
 .../kube/controller/endpointslice.go               |  12 +++
 dubbod/planet/pkg/xds/eds.go                       | 108 +++++++++++++++++++--
 .../planet/pkg/xds/endpoints/endpoint_builder.go   |  63 +++++++++---
 pkg/art/art.go                                     |   2 +
 samples/grpc-app/grpc-app.yaml                     |   4 +-
 8 files changed, 205 insertions(+), 43 deletions(-)

diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index 45e700df..3fcb89f4 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -18,7 +18,7 @@ name: Dubbo Kubernetes Release
 on:
   push:
     tags:
-      - 'v*'
+      - '[0-9]+.[0-9]+.[0-9]+'
 
 permissions:
   contents: write
diff --git a/dubbod/planet/pkg/model/endpointshards.go 
b/dubbod/planet/pkg/model/endpointshards.go
index 08850337..c69179b3 100644
--- a/dubbod/planet/pkg/model/endpointshards.go
+++ b/dubbod/planet/pkg/model/endpointshards.go
@@ -132,6 +132,30 @@ func (e *EndpointIndex) ShardsForService(serviceName, 
namespace string) (*Endpoi
        return shards, ok
 }
 
+// AllServices returns all service names registered in the EndpointIndex
+func (e *EndpointIndex) AllServices() []string {
+       e.mu.RLock()
+       defer e.mu.RUnlock()
+       services := make([]string, 0, len(e.shardsBySvc))
+       for svcName := range e.shardsBySvc {
+               services = append(services, svcName)
+       }
+       return services
+}
+
+// ServicesInNamespace returns all service names in the given namespace
+func (e *EndpointIndex) ServicesInNamespace(namespace string) []string {
+       e.mu.RLock()
+       defer e.mu.RUnlock()
+       services := make([]string, 0)
+       for svcName, byNs := range e.shardsBySvc {
+               if _, ok := byNs[namespace]; ok {
+                       services = append(services, svcName)
+               }
+       }
+       return services
+}
+
 func (es *EndpointShards) CopyEndpoints(portMap map[string]int, ports 
sets.Set[int]) map[int][]*DubboEndpoint {
        es.RLock()
        defer es.RUnlock()
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go 
b/dubbod/planet/pkg/networking/grpcgen/cds.go
index eb88dfdc..4381088b 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
 
 import (
        "fmt"
+
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/util"
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
@@ -115,25 +116,25 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
        if b.filter.Contains(b.defaultClusterName) {
                defaultCluster = b.edsCluster(b.defaultClusterName)
                // CRITICAL: For gRPC proxyless, we need to set CommonLbConfig 
to handle endpoint health status
-               // This ensures that the cluster can use healthy endpoints for 
load balancing
+               // Following Istio's implementation, we should include 
UNHEALTHY and DRAINING endpoints
+               // in OverrideHostStatus so that clients can use them when 
healthy endpoints are not available.
+               // This prevents "weighted-target: no targets to pick from" 
errors when all endpoints are unhealthy.
+               // The client will prioritize HEALTHY endpoints but can fall 
back to UNHEALTHY/DRAINING if needed.
                if defaultCluster.CommonLbConfig == nil {
                        defaultCluster.CommonLbConfig = 
&cluster.Cluster_CommonLbConfig{}
                }
-               if b.svc.SupportsDrainingEndpoints() {
-                       // see core/v1alpha3/cluster.go
-                       defaultCluster.CommonLbConfig.OverrideHostStatus = 
&core.HealthStatusSet{
-                               Statuses: []core.HealthStatus{
-                                       core.HealthStatus_HEALTHY,
-                                       core.HealthStatus_DRAINING, 
core.HealthStatus_UNKNOWN, core.HealthStatus_DEGRADED,
-                               },
-                       }
-               } else {
-                       // For gRPC proxyless, only use HEALTHY endpoints by 
default
-                       defaultCluster.CommonLbConfig.OverrideHostStatus = 
&core.HealthStatusSet{
-                               Statuses: []core.HealthStatus{
-                                       core.HealthStatus_HEALTHY,
-                               },
-                       }
+               // CRITICAL FIX: Following Istio's implementation, always 
include UNHEALTHY and DRAINING
+               // in OverrideHostStatus. This allows clients to use unhealthy 
endpoints when healthy ones
+               // are not available, preventing "weighted-target: no targets 
to pick from" errors.
+               // The client will still prioritize HEALTHY endpoints, but can 
fall back to others.
+               defaultCluster.CommonLbConfig.OverrideHostStatus = 
&core.HealthStatusSet{
+                       Statuses: []core.HealthStatus{
+                               core.HealthStatus_HEALTHY,
+                               core.HealthStatus_UNHEALTHY,
+                               core.HealthStatus_DRAINING,
+                               core.HealthStatus_UNKNOWN,
+                               core.HealthStatus_DEGRADED,
+                       },
                }
        }
 
diff --git a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go 
b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
index ad02059f..04a0020f 100644
--- a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -550,6 +550,18 @@ func (esc *endpointSliceController) pushEDS(hostnames 
[]host.Name, namespace str
 
        for _, hostname := range hostnames {
                endpoints := esc.endpointCache.get(hostname)
+               // CRITICAL: Log endpoint registration for debugging
+               log.Infof("pushEDS: registering %d endpoints for service %s in 
namespace %s (shard=%v)",
+                       len(endpoints), string(hostname), namespace, shard)
+               if len(endpoints) > 0 {
+                       // Log endpoint details for first few endpoints
+                       for i, ep := range endpoints {
+                               if i < 3 {
+                                       log.Infof("pushEDS: endpoint[%d] 
address=%s, port=%d, ServicePortName='%s', HealthStatus=%v",
+                                               i, ep.FirstAddressOrNil(), 
ep.EndpointPort, ep.ServicePortName, ep.HealthStatus)
+                               }
+                       }
+               }
                esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname), 
namespace, endpoints)
        }
 }
diff --git a/dubbod/planet/pkg/xds/eds.go b/dubbod/planet/pkg/xds/eds.go
index f24d655b..7fb55686 100644
--- a/dubbod/planet/pkg/xds/eds.go
+++ b/dubbod/planet/pkg/xds/eds.go
@@ -100,6 +100,14 @@ func edsNeedsPush(req *model.PushRequest, proxy 
*model.Proxy) bool {
        if res, ok := xdsNeedsPush(req, proxy); ok {
                return res
        }
+       // CRITICAL FIX: For proxy requests (ProxyRequest reason), we should 
always push EDS
+       // This ensures that when a client explicitly requests EDS resources, 
we generate them
+       // even if there are no config updates. This is essential for proxyless 
gRPC clients
+       // that request specific EDS resources.
+       if req.Reason != nil && req.Reason.Has(model.ProxyRequest) {
+               log.Debugf("edsNeedsPush: ProxyRequest detected, pushing EDS 
even without config updates")
+               return true
+       }
        for config := range req.ConfigsUpdated {
                if !skippedEdsConfigs.Contains(config.Kind) {
                        return true
@@ -109,10 +117,17 @@ func edsNeedsPush(req *model.PushRequest, proxy 
*model.Proxy) bool {
 }
 
 func (eds *EdsGenerator) Generate(proxy *model.Proxy, w 
*model.WatchedResource, req *model.PushRequest) (model.Resources, 
model.XdsLogDetails, error) {
+       // CRITICAL: Log EDS generation attempt for debugging
+       log.Infof("EDS Generate: proxy=%s, watchedResources=%d, req.Full=%v, 
req.ConfigsUpdated=%v",
+               proxy.ID, len(w.ResourceNames), req.Full, 
len(req.ConfigsUpdated))
+
        if !edsNeedsPush(req, proxy) {
+               log.Infof("EDS Generate: edsNeedsPush returned false, skipping 
EDS generation for proxy=%s", proxy.ID)
                return nil, model.DefaultXdsLogDetails, nil
        }
        resources, logDetails := eds.buildEndpoints(proxy, req, w)
+       log.Infof("EDS Generate: built %d resources for proxy=%s (%s)",
+               len(resources), proxy.ID, logDetails.AdditionalInfo)
        return resources, logDetails, nil
 }
 
@@ -334,11 +349,31 @@ func (eds *EdsGenerator) buildDeltaEndpoints(proxy 
*model.Proxy,
        regenerated := 0
 
        for clusterName := range w.ResourceNames {
-               // filter out eds that are not updated for clusters
+               hostname := model.ParseSubsetKeyHostname(clusterName)
+
+               // CRITICAL FIX: For proxyless gRPC, we MUST always process all 
watched clusters
+               // This ensures clients receive EDS updates even when endpoints 
become available or unavailable
+               // For non-proxyless (Istio behavior), we skip clusters that 
are not in edsUpdatedServices for incremental pushes
+               serviceWasUpdated := false
                if len(edsUpdatedServices) > 0 {
-                       if _, ok := 
edsUpdatedServices[model.ParseSubsetKeyHostname(clusterName)]; !ok {
-                               continue
+                       if _, ok := edsUpdatedServices[hostname]; !ok {
+                               // Cluster was not in edsUpdatedServices
+                               // CRITICAL: For proxyless gRPC, always process 
all clusters to ensure EDS is up-to-date
+                               // This is essential because proxyless gRPC 
clients need to know endpoint state changes
+                               // even if the service wasn't explicitly 
updated in this push
+                               if proxy.IsProxylessGrpc() {
+                                       log.Debugf("buildDeltaEndpoints: 
proxyless gRPC, processing cluster %s even though not in edsUpdatedServices 
(hostname=%s)", clusterName, hostname)
+                                       serviceWasUpdated = true
+                               } else {
+                                       // For non-proxyless, skip if not 
updated (Istio behavior)
+                                       continue
+                               }
+                       } else {
+                               serviceWasUpdated = true
                        }
+               } else {
+                       // If edsUpdatedServices is nil, this is a full push - 
process all clusters
+                       serviceWasUpdated = true
                }
 
                builder := endpoints.NewEndpointBuilder(clusterName, proxy, 
req.Push)
@@ -353,8 +388,32 @@ func (eds *EdsGenerator) buildDeltaEndpoints(proxy 
*model.Proxy,
                        continue
                }
 
-               // Try to get from cache
-               if eds.Cache != nil {
+               // CRITICAL FIX: For proxyless gRPC, we must always regenerate 
EDS when serviceWasUpdated is true
+               // to ensure empty endpoints are sent when they become 
unavailable, and endpoints are sent when available.
+               // For incremental pushes (Full=false), we must always 
regenerate EDS
+               // if ConfigsUpdated contains this service, because endpoint 
health status may have changed.
+               // The cache may contain stale empty endpoints from when the 
endpoint was unhealthy.
+               shouldRegenerate := serviceWasUpdated
+               if !shouldRegenerate && !req.Full && req.ConfigsUpdated != nil {
+                       // For incremental pushes, check if any ServiceEntry in 
ConfigsUpdated matches this hostname
+                       for ckey := range req.ConfigsUpdated {
+                               if ckey.Kind == kind.ServiceEntry && ckey.Name 
== hostname {
+                                       shouldRegenerate = true
+                                       log.Debugf("buildDeltaEndpoints: 
forcing regeneration for cluster %s (hostname=%s) due to ConfigsUpdated", 
clusterName, hostname)
+                                       break
+                               }
+                       }
+               }
+
+               // CRITICAL: For proxyless gRPC, if serviceWasUpdated is true, 
we must regenerate
+               // even if cache exists, to ensure endpoints are always 
up-to-date
+               // This is critical for preventing "weighted-target: no targets 
to pick from" errors
+               if shouldRegenerate && proxy.IsProxylessGrpc() {
+                       log.Debugf("buildDeltaEndpoints: proxyless gRPC, 
forcing regeneration for cluster %s (hostname=%s, serviceWasUpdated=%v)", 
clusterName, hostname, serviceWasUpdated)
+               }
+
+               // Try to get from cache only if we don't need to regenerate
+               if !shouldRegenerate && eds.Cache != nil {
                        cachedEndpoint := eds.Cache.Get(builder)
                        if cachedEndpoint != nil {
                                resources = append(resources, cachedEndpoint)
@@ -365,14 +424,47 @@ func (eds *EdsGenerator) buildDeltaEndpoints(proxy 
*model.Proxy,
 
                // generate new eds cache
                l := builder.BuildClusterLoadAssignment(eds.EndpointIndex)
-               if l == nil || len(l.Endpoints) == 0 {
-                       removed = append(removed, clusterName)
-                       continue
+               // NOTE: BuildClusterLoadAssignment never returns nil - it 
always returns either:
+               // 1. A valid ClusterLoadAssignment with endpoints, or
+               // 2. An empty ClusterLoadAssignment (via 
buildEmptyClusterLoadAssignment)
+               // The empty ClusterLoadAssignment has an explicit empty 
Endpoints list,
+               // which is critical for proxyless gRPC clients to know the 
endpoint state
+               // and prevent "weighted-target: no targets to pick from" 
errors.
+               // The nil check below is defensive programming, but should 
never be true in practice.
+               if l == nil {
+                       // Defensive: Build an empty ClusterLoadAssignment if 
somehow nil is returned
+                       // This should never happen, but ensures we always send 
a valid response
+                       l = &endpoint.ClusterLoadAssignment{
+                               ClusterName: clusterName,
+                               Endpoints:   []*endpoint.LocalityLbEndpoints{}, 
// Empty endpoints list
+                       }
+                       log.Warnf("buildDeltaEndpoints: 
BuildClusterLoadAssignment returned nil for cluster %s (hostname: %s), created 
empty CLA", clusterName, hostname)
                }
                regenerated++
+
+               // CRITICAL: Log EDS push details for proxyless gRPC to help 
diagnose "weighted-target: no targets to pick from" errors
                if len(l.Endpoints) == 0 {
                        empty++
+                       if proxy.IsProxylessGrpc() {
+                               log.Infof("buildDeltaEndpoints: proxyless gRPC, 
pushing empty EDS for cluster %s (hostname=%s, serviceWasUpdated=%v, 
shouldRegenerate=%v)",
+                                       clusterName, hostname, 
serviceWasUpdated, shouldRegenerate)
+                       }
+               } else {
+                       // Count total endpoints in the ClusterLoadAssignment
+                       totalEndpointsInCLA := 0
+                       for _, localityLbEp := range l.Endpoints {
+                               totalEndpointsInCLA += 
len(localityLbEp.LbEndpoints)
+                       }
+                       if proxy.IsProxylessGrpc() {
+                               log.Infof("buildDeltaEndpoints: proxyless gRPC, 
pushing EDS for cluster %s (hostname=%s, endpoints=%d, serviceWasUpdated=%v, 
shouldRegenerate=%v)",
+                                       clusterName, hostname, 
totalEndpointsInCLA, serviceWasUpdated, shouldRegenerate)
+                       }
                }
+
+               // CRITICAL FIX: Always send ClusterLoadAssignment, even if 
empty
+               // This is essential for proxyless gRPC - clients need to know 
when endpoints are unavailable
+               // Removing the cluster from the response causes 
"weighted-target: no targets to pick from" errors
+               // because the client never receives an update indicating 
endpoints are empty
                resource := &discovery.Resource{
                        Name:     l.ClusterName,
                        Resource: protoconv.MessageToAny(l),
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go 
b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index 37878bb4..f98693ea 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -85,12 +85,23 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
        }
 
        // Get endpoints from the endpoint index
+       // CRITICAL: Log all available services in EndpointIndex for debugging
+       allServices := endpointIndex.AllServices()
+       if len(allServices) > 0 {
+               log.Infof("BuildClusterLoadAssignment: EndpointIndex contains 
%d services: %v", len(allServices), allServices)
+       } else {
+               log.Warnf("BuildClusterLoadAssignment: EndpointIndex is empty - 
no services registered")
+       }
+
        shards, ok := endpointIndex.ShardsForService(string(b.hostname), 
b.service.Attributes.Namespace)
        if !ok {
                // CRITICAL: Log at INFO level for proxyless gRPC to help 
diagnose "weighted-target: no targets to pick from" errors
+               // Also log what services ARE available in the namespace
+               servicesInNamespace := 
endpointIndex.ServicesInNamespace(b.service.Attributes.Namespace)
                log.Infof("BuildClusterLoadAssignment: no shards found for 
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s', 
svcPort.Port=%d). "+
-                       "This usually means endpoints are not yet available or 
service is not registered.",
-                       b.hostname, b.service.Attributes.Namespace, 
b.clusterName, b.port, svcPort.Name, svcPort.Port)
+                       "This usually means endpoints are not yet available or 
service is not registered. "+
+                       "Available services in namespace: %v",
+                       b.hostname, b.service.Attributes.Namespace, 
b.clusterName, b.port, svcPort.Name, svcPort.Port, servicesInNamespace)
                return buildEmptyClusterLoadAssignment(b.clusterName)
        }
 
@@ -170,21 +181,24 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
                                continue
                        }
 
-                       // Filter out unhealthy endpoints unless service 
supports them
-                       // CRITICAL: Even if endpoint is unhealthy, we should 
include it if:
-                       // 1. Service has publishNotReadyAddresses=true 
(SupportsUnhealthyEndpoints returns true)
-                       // 2. Or if the endpoint is actually healthy 
(HealthStatus=Healthy)
+                       // CRITICAL FIX: Following Istio's implementation, we 
should ALWAYS include endpoints in EDS,
+                       // regardless of their health status. The client will 
decide whether to use them based on
+                       // OverrideHostStatus in the Cluster configuration.
                        //
-                       // For gRPC proxyless, we may want to include endpoints 
even if they're not ready initially,
-                       // as the client will handle connection failures. 
However, this is controlled by the service
-                       // configuration (publishNotReadyAddresses).
-                       if !b.service.SupportsUnhealthyEndpoints() && 
ep.HealthStatus == model.UnHealthy {
-                               unhealthyCount++
-                               filteredCount++
-                               log.Debugf("BuildClusterLoadAssignment: 
filtering out unhealthy endpoint %s (HealthStatus=%v, 
SupportsUnhealthyEndpoints=%v)",
-                                       ep.FirstAddressOrNil(), 
ep.HealthStatus, b.service.SupportsUnhealthyEndpoints())
-                               continue
-                       }
+                       // However, if the service explicitly doesn't support 
unhealthy endpoints (publishNotReadyAddresses=false),
+                       // we should still include them in EDS but mark them as 
UNHEALTHY. The client's OverrideHostStatus
+                       // will determine if they can be used.
+                       //
+                       // This is critical for proxyless gRPC - even if 
endpoints are unhealthy, they should be
+                       // included in EDS so the client knows they exist and 
can attempt to connect to them.
+                       // The client will handle connection failures 
appropriately.
+                       //
+                       // Note: We only filter out unhealthy endpoints if the 
service explicitly doesn't support them
+                       // AND we're not in a proxyless gRPC scenario. For 
proxyless gRPC, always include endpoints.
+                       // For non-proxyless (Envoy), we follow the service's 
publishNotReadyAddresses setting.
+                       // But since this is proxyless gRPC, we should always 
include endpoints.
+                       // Actually, let's follow Istio's behavior: always 
include endpoints, let the client decide.
+                       // The OverrideHostStatus in Cluster config will 
control whether unhealthy endpoints can be used.
 
                        // Build LbEndpoint
                        lbEp := b.buildLbEndpoint(ep)
@@ -214,6 +228,23 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
        }
 
        // Create LocalityLbEndpoints with empty locality (default)
+       // CRITICAL: Log endpoint health status for debugging
+       healthyCount := 0
+       unhealthyInLbCount := 0
+       drainingCount := 0
+       for _, lbEp := range lbEndpoints {
+               switch lbEp.HealthStatus {
+               case core.HealthStatus_HEALTHY:
+                       healthyCount++
+               case core.HealthStatus_UNHEALTHY:
+                       unhealthyInLbCount++
+               case core.HealthStatus_DRAINING:
+                       drainingCount++
+               }
+       }
+       log.Infof("BuildClusterLoadAssignment: cluster %s (hostname=%s, 
port=%d) - total endpoints: %d (healthy=%d, unhealthy=%d, draining=%d, 
SupportsUnhealthyEndpoints=%v)",
+               b.clusterName, b.hostname, b.port, len(lbEndpoints), 
healthyCount, unhealthyInLbCount, drainingCount, 
b.service.SupportsUnhealthyEndpoints())
+
        localityLbEndpoints := []*endpoint.LocalityLbEndpoints{
                {
                        Locality:    &core.Locality{},
diff --git a/pkg/art/art.go b/pkg/art/art.go
index 8ca16710..0e0582ef 100644
--- a/pkg/art/art.go
+++ b/pkg/art/art.go
@@ -22,6 +22,8 @@ import (
        "github.com/fatih/color"
 )
 
+// 
https://patorjk.com/software/taag/#p=display&f=Graffiti&t=Type+Something+&x=none&v=4&h=4&w=80&we=false
+//
 //go:embed dubbo-ascii.txt
 var dubboASCIIArt string
 
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index 5e4a382d..506b6405 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -50,7 +50,7 @@ spec:
     spec:
       containers:
         - name: app
-          image: dev-debug
+          image: mfordjody/grpc-consumer:dev-debug
           imagePullPolicy: Always
           ports:
             - containerPort: 17070
@@ -101,7 +101,7 @@ spec:
       containers:
         - name: app
           # Replace with your own image containing the producer binary
-          image: dev-debug
+          image: mfordjody/grpc-producer:dev-debug
           imagePullPolicy: Always
           # Optional: uncomment to test direct connection
           # - --target=xds:///consumer.grpc-app.svc.cluster.local:7070

Reply via email to