This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new ac5dd3ed fix: eds unable to retrieve unhealthy endpoints (#819)
ac5dd3ed is described below
commit ac5dd3ede51854a4ccc08e4eda5985385ac39bfa
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Nov 11 19:25:02 2025 +0800
fix: eds unable to retrieve unhealthy endpoints (#819)
---
.github/workflows/release.yaml | 2 +-
dubbod/planet/pkg/model/endpointshards.go | 24 +++++
dubbod/planet/pkg/networking/grpcgen/cds.go | 33 ++++---
.../kube/controller/endpointslice.go | 12 +++
dubbod/planet/pkg/xds/eds.go | 108 +++++++++++++++++++--
.../planet/pkg/xds/endpoints/endpoint_builder.go | 63 +++++++++---
pkg/art/art.go | 2 +
samples/grpc-app/grpc-app.yaml | 4 +-
8 files changed, 205 insertions(+), 43 deletions(-)
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index 45e700df..3fcb89f4 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -18,7 +18,7 @@ name: Dubbo Kubernetes Release
on:
push:
tags:
- - 'v*'
+ - '[0-9]+.[0-9]+.[0-9]+'
permissions:
contents: write
diff --git a/dubbod/planet/pkg/model/endpointshards.go
b/dubbod/planet/pkg/model/endpointshards.go
index 08850337..c69179b3 100644
--- a/dubbod/planet/pkg/model/endpointshards.go
+++ b/dubbod/planet/pkg/model/endpointshards.go
@@ -132,6 +132,30 @@ func (e *EndpointIndex) ShardsForService(serviceName,
namespace string) (*Endpoi
return shards, ok
}
+// AllServices returns all service names registered in the EndpointIndex
+func (e *EndpointIndex) AllServices() []string {
+ e.mu.RLock()
+ defer e.mu.RUnlock()
+ services := make([]string, 0, len(e.shardsBySvc))
+ for svcName := range e.shardsBySvc {
+ services = append(services, svcName)
+ }
+ return services
+}
+
+// ServicesInNamespace returns all service names in the given namespace
+func (e *EndpointIndex) ServicesInNamespace(namespace string) []string {
+ e.mu.RLock()
+ defer e.mu.RUnlock()
+ services := make([]string, 0)
+ for svcName, byNs := range e.shardsBySvc {
+ if _, ok := byNs[namespace]; ok {
+ services = append(services, svcName)
+ }
+ }
+ return services
+}
+
func (es *EndpointShards) CopyEndpoints(portMap map[string]int, ports
sets.Set[int]) map[int][]*DubboEndpoint {
es.RLock()
defer es.RUnlock()
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go
b/dubbod/planet/pkg/networking/grpcgen/cds.go
index eb88dfdc..4381088b 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/util"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
@@ -115,25 +116,25 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
if b.filter.Contains(b.defaultClusterName) {
defaultCluster = b.edsCluster(b.defaultClusterName)
// CRITICAL: For gRPC proxyless, we need to set CommonLbConfig
to handle endpoint health status
- // This ensures that the cluster can use healthy endpoints for
load balancing
+ // Following Istio's implementation, we should include
UNHEALTHY and DRAINING endpoints
+ // in OverrideHostStatus so that clients can use them when
healthy endpoints are not available.
+ // This prevents "weighted-target: no targets to pick from"
errors when all endpoints are unhealthy.
+ // The client will prioritize HEALTHY endpoints but can fall
back to UNHEALTHY/DRAINING if needed.
if defaultCluster.CommonLbConfig == nil {
defaultCluster.CommonLbConfig =
&cluster.Cluster_CommonLbConfig{}
}
- if b.svc.SupportsDrainingEndpoints() {
- // see core/v1alpha3/cluster.go
- defaultCluster.CommonLbConfig.OverrideHostStatus =
&core.HealthStatusSet{
- Statuses: []core.HealthStatus{
- core.HealthStatus_HEALTHY,
- core.HealthStatus_DRAINING,
core.HealthStatus_UNKNOWN, core.HealthStatus_DEGRADED,
- },
- }
- } else {
- // For gRPC proxyless, only use HEALTHY endpoints by
default
- defaultCluster.CommonLbConfig.OverrideHostStatus =
&core.HealthStatusSet{
- Statuses: []core.HealthStatus{
- core.HealthStatus_HEALTHY,
- },
- }
+ // CRITICAL FIX: Following Istio's implementation, always
include UNHEALTHY and DRAINING
+ // in OverrideHostStatus. This allows clients to use unhealthy
endpoints when healthy ones
+ // are not available, preventing "weighted-target: no targets
to pick from" errors.
+ // The client will still prioritize HEALTHY endpoints, but can
fall back to others.
+ defaultCluster.CommonLbConfig.OverrideHostStatus =
&core.HealthStatusSet{
+ Statuses: []core.HealthStatus{
+ core.HealthStatus_HEALTHY,
+ core.HealthStatus_UNHEALTHY,
+ core.HealthStatus_DRAINING,
+ core.HealthStatus_UNKNOWN,
+ core.HealthStatus_DEGRADED,
+ },
}
}
diff --git a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
index ad02059f..04a0020f 100644
--- a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -550,6 +550,18 @@ func (esc *endpointSliceController) pushEDS(hostnames
[]host.Name, namespace str
for _, hostname := range hostnames {
endpoints := esc.endpointCache.get(hostname)
+ // CRITICAL: Log endpoint registration for debugging
+ log.Infof("pushEDS: registering %d endpoints for service %s in
namespace %s (shard=%v)",
+ len(endpoints), string(hostname), namespace, shard)
+ if len(endpoints) > 0 {
+ // Log endpoint details for first few endpoints
+ for i, ep := range endpoints {
+ if i < 3 {
+ log.Infof("pushEDS: endpoint[%d]
address=%s, port=%d, ServicePortName='%s', HealthStatus=%v",
+ i, ep.FirstAddressOrNil(),
ep.EndpointPort, ep.ServicePortName, ep.HealthStatus)
+ }
+ }
+ }
esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname),
namespace, endpoints)
}
}
diff --git a/dubbod/planet/pkg/xds/eds.go b/dubbod/planet/pkg/xds/eds.go
index f24d655b..7fb55686 100644
--- a/dubbod/planet/pkg/xds/eds.go
+++ b/dubbod/planet/pkg/xds/eds.go
@@ -100,6 +100,14 @@ func edsNeedsPush(req *model.PushRequest, proxy
*model.Proxy) bool {
if res, ok := xdsNeedsPush(req, proxy); ok {
return res
}
+ // CRITICAL FIX: For proxy requests (ProxyRequest reason), we should
always push EDS
+ // This ensures that when a client explicitly requests EDS resources,
we generate them
+ // even if there are no config updates. This is essential for proxyless
gRPC clients
+ // that request specific EDS resources.
+ if req.Reason != nil && req.Reason.Has(model.ProxyRequest) {
+ log.Debugf("edsNeedsPush: ProxyRequest detected, pushing EDS
even without config updates")
+ return true
+ }
for config := range req.ConfigsUpdated {
if !skippedEdsConfigs.Contains(config.Kind) {
return true
@@ -109,10 +117,17 @@ func edsNeedsPush(req *model.PushRequest, proxy
*model.Proxy) bool {
}
func (eds *EdsGenerator) Generate(proxy *model.Proxy, w
*model.WatchedResource, req *model.PushRequest) (model.Resources,
model.XdsLogDetails, error) {
+ // CRITICAL: Log EDS generation attempt for debugging
+ log.Infof("EDS Generate: proxy=%s, watchedResources=%d, req.Full=%v,
req.ConfigsUpdated=%v",
+ proxy.ID, len(w.ResourceNames), req.Full,
len(req.ConfigsUpdated))
+
if !edsNeedsPush(req, proxy) {
+ log.Infof("EDS Generate: edsNeedsPush returned false, skipping
EDS generation for proxy=%s", proxy.ID)
return nil, model.DefaultXdsLogDetails, nil
}
resources, logDetails := eds.buildEndpoints(proxy, req, w)
+ log.Infof("EDS Generate: built %d resources for proxy=%s (%s)",
+ len(resources), proxy.ID, logDetails.AdditionalInfo)
return resources, logDetails, nil
}
@@ -334,11 +349,31 @@ func (eds *EdsGenerator) buildDeltaEndpoints(proxy
*model.Proxy,
regenerated := 0
for clusterName := range w.ResourceNames {
- // filter out eds that are not updated for clusters
+ hostname := model.ParseSubsetKeyHostname(clusterName)
+
+ // CRITICAL FIX: For proxyless gRPC, we MUST always process all
watched clusters
+ // This ensures clients receive EDS updates even when endpoints
become available or unavailable
+ // For non-proxyless (Istio behavior), we skip clusters that
are not in edsUpdatedServices for incremental pushes
+ serviceWasUpdated := false
if len(edsUpdatedServices) > 0 {
- if _, ok :=
edsUpdatedServices[model.ParseSubsetKeyHostname(clusterName)]; !ok {
- continue
+ if _, ok := edsUpdatedServices[hostname]; !ok {
+ // Cluster was not in edsUpdatedServices
+ // CRITICAL: For proxyless gRPC, always process
all clusters to ensure EDS is up-to-date
+ // This is essential because proxyless gRPC
clients need to know endpoint state changes
+ // even if the service wasn't explicitly
updated in this push
+ if proxy.IsProxylessGrpc() {
+ log.Debugf("buildDeltaEndpoints:
proxyless gRPC, processing cluster %s even though not in edsUpdatedServices
(hostname=%s)", clusterName, hostname)
+ serviceWasUpdated = true
+ } else {
+ // For non-proxyless, skip if not
updated (Istio behavior)
+ continue
+ }
+ } else {
+ serviceWasUpdated = true
}
+ } else {
+ // If edsUpdatedServices is nil, this is a full push -
process all clusters
+ serviceWasUpdated = true
}
builder := endpoints.NewEndpointBuilder(clusterName, proxy,
req.Push)
@@ -353,8 +388,32 @@ func (eds *EdsGenerator) buildDeltaEndpoints(proxy
*model.Proxy,
continue
}
- // Try to get from cache
- if eds.Cache != nil {
+ // CRITICAL FIX: For proxyless gRPC, we must always regenerate
EDS when serviceWasUpdated is true
+ // to ensure empty endpoints are sent when they become
unavailable, and endpoints are sent when available.
+ // For incremental pushes (Full=false), we must always
regenerate EDS
+ // if ConfigsUpdated contains this service, because endpoint
health status may have changed.
+ // The cache may contain stale empty endpoints from when the
endpoint was unhealthy.
+ shouldRegenerate := serviceWasUpdated
+ if !shouldRegenerate && !req.Full && req.ConfigsUpdated != nil {
+ // For incremental pushes, check if any ServiceEntry in
ConfigsUpdated matches this hostname
+ for ckey := range req.ConfigsUpdated {
+ if ckey.Kind == kind.ServiceEntry && ckey.Name
== hostname {
+ shouldRegenerate = true
+ log.Debugf("buildDeltaEndpoints:
forcing regeneration for cluster %s (hostname=%s) due to ConfigsUpdated",
clusterName, hostname)
+ break
+ }
+ }
+ }
+
+ // CRITICAL: For proxyless gRPC, if serviceWasUpdated is true,
we must regenerate
+ // even if cache exists, to ensure endpoints are always
up-to-date
+ // This is critical for preventing "weighted-target: no targets
to pick from" errors
+ if shouldRegenerate && proxy.IsProxylessGrpc() {
+ log.Debugf("buildDeltaEndpoints: proxyless gRPC,
forcing regeneration for cluster %s (hostname=%s, serviceWasUpdated=%v)",
clusterName, hostname, serviceWasUpdated)
+ }
+
+ // Try to get from cache only if we don't need to regenerate
+ if !shouldRegenerate && eds.Cache != nil {
cachedEndpoint := eds.Cache.Get(builder)
if cachedEndpoint != nil {
resources = append(resources, cachedEndpoint)
@@ -365,14 +424,47 @@ func (eds *EdsGenerator) buildDeltaEndpoints(proxy
*model.Proxy,
// generate new eds cache
l := builder.BuildClusterLoadAssignment(eds.EndpointIndex)
- if l == nil || len(l.Endpoints) == 0 {
- removed = append(removed, clusterName)
- continue
+ // NOTE: BuildClusterLoadAssignment never returns nil - it
always returns either:
+ // 1. A valid ClusterLoadAssignment with endpoints, or
+ // 2. An empty ClusterLoadAssignment (via
buildEmptyClusterLoadAssignment)
+ // The empty ClusterLoadAssignment has an explicit empty
Endpoints list,
+ // which is critical for proxyless gRPC clients to know the
endpoint state
+ // and prevent "weighted-target: no targets to pick from"
errors.
+ // The nil check below is defensive programming, but should
never be true in practice.
+ if l == nil {
+ // Defensive: Build an empty ClusterLoadAssignment if
somehow nil is returned
+ // This should never happen, but ensures we always send
a valid response
+ l = &endpoint.ClusterLoadAssignment{
+ ClusterName: clusterName,
+ Endpoints: []*endpoint.LocalityLbEndpoints{},
// Empty endpoints list
+ }
+ log.Warnf("buildDeltaEndpoints:
BuildClusterLoadAssignment returned nil for cluster %s (hostname: %s), created
empty CLA", clusterName, hostname)
}
regenerated++
+
+ // CRITICAL: Log EDS push details for proxyless gRPC to help
diagnose "weighted-target: no targets to pick from" errors
if len(l.Endpoints) == 0 {
empty++
+ if proxy.IsProxylessGrpc() {
+ log.Infof("buildDeltaEndpoints: proxyless gRPC,
pushing empty EDS for cluster %s (hostname=%s, serviceWasUpdated=%v,
shouldRegenerate=%v)",
+ clusterName, hostname,
serviceWasUpdated, shouldRegenerate)
+ }
+ } else {
+ // Count total endpoints in the ClusterLoadAssignment
+ totalEndpointsInCLA := 0
+ for _, localityLbEp := range l.Endpoints {
+ totalEndpointsInCLA +=
len(localityLbEp.LbEndpoints)
+ }
+ if proxy.IsProxylessGrpc() {
+ log.Infof("buildDeltaEndpoints: proxyless gRPC,
pushing EDS for cluster %s (hostname=%s, endpoints=%d, serviceWasUpdated=%v,
shouldRegenerate=%v)",
+ clusterName, hostname,
totalEndpointsInCLA, serviceWasUpdated, shouldRegenerate)
+ }
}
+
+ // CRITICAL FIX: Always send ClusterLoadAssignment, even if
empty
+ // This is essential for proxyless gRPC - clients need to know
when endpoints are unavailable
+ // Removing the cluster from the response causes
"weighted-target: no targets to pick from" errors
+ // because the client never receives an update indicating
endpoints are empty
resource := &discovery.Resource{
Name: l.ClusterName,
Resource: protoconv.MessageToAny(l),
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index 37878bb4..f98693ea 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -85,12 +85,23 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
}
// Get endpoints from the endpoint index
+ // CRITICAL: Log all available services in EndpointIndex for debugging
+ allServices := endpointIndex.AllServices()
+ if len(allServices) > 0 {
+ log.Infof("BuildClusterLoadAssignment: EndpointIndex contains
%d services: %v", len(allServices), allServices)
+ } else {
+ log.Warnf("BuildClusterLoadAssignment: EndpointIndex is empty -
no services registered")
+ }
+
shards, ok := endpointIndex.ShardsForService(string(b.hostname),
b.service.Attributes.Namespace)
if !ok {
// CRITICAL: Log at INFO level for proxyless gRPC to help
diagnose "weighted-target: no targets to pick from" errors
+ // Also log what services ARE available in the namespace
+ servicesInNamespace :=
endpointIndex.ServicesInNamespace(b.service.Attributes.Namespace)
log.Infof("BuildClusterLoadAssignment: no shards found for
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s',
svcPort.Port=%d). "+
- "This usually means endpoints are not yet available or
service is not registered.",
- b.hostname, b.service.Attributes.Namespace,
b.clusterName, b.port, svcPort.Name, svcPort.Port)
+ "This usually means endpoints are not yet available or
service is not registered. "+
+ "Available services in namespace: %v",
+ b.hostname, b.service.Attributes.Namespace,
b.clusterName, b.port, svcPort.Name, svcPort.Port, servicesInNamespace)
return buildEmptyClusterLoadAssignment(b.clusterName)
}
@@ -170,21 +181,24 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
continue
}
- // Filter out unhealthy endpoints unless service
supports them
- // CRITICAL: Even if endpoint is unhealthy, we should
include it if:
- // 1. Service has publishNotReadyAddresses=true
(SupportsUnhealthyEndpoints returns true)
- // 2. Or if the endpoint is actually healthy
(HealthStatus=Healthy)
+ // CRITICAL FIX: Following Istio's implementation, we
should ALWAYS include endpoints in EDS,
+ // regardless of their health status. The client will
decide whether to use them based on
+ // OverrideHostStatus in the Cluster configuration.
//
- // For gRPC proxyless, we may want to include endpoints
even if they're not ready initially,
- // as the client will handle connection failures.
However, this is controlled by the service
- // configuration (publishNotReadyAddresses).
- if !b.service.SupportsUnhealthyEndpoints() &&
ep.HealthStatus == model.UnHealthy {
- unhealthyCount++
- filteredCount++
- log.Debugf("BuildClusterLoadAssignment:
filtering out unhealthy endpoint %s (HealthStatus=%v,
SupportsUnhealthyEndpoints=%v)",
- ep.FirstAddressOrNil(),
ep.HealthStatus, b.service.SupportsUnhealthyEndpoints())
- continue
- }
+ // However, if the service explicitly doesn't support
unhealthy endpoints (publishNotReadyAddresses=false),
+ // we should still include them in EDS but mark them as
UNHEALTHY. The client's OverrideHostStatus
+ // will determine if they can be used.
+ //
+ // This is critical for proxyless gRPC - even if
endpoints are unhealthy, they should be
+ // included in EDS so the client knows they exist and
can attempt to connect to them.
+ // The client will handle connection failures
appropriately.
+ //
+ // Note: We only filter out unhealthy endpoints if the
service explicitly doesn't support them
+ // AND we're not in a proxyless gRPC scenario. For
proxyless gRPC, always include endpoints.
+ // For non-proxyless (Envoy), we follow the service's
publishNotReadyAddresses setting.
+ // But since this is proxyless gRPC, we should always
include endpoints.
+ // Actually, let's follow Istio's behavior: always
include endpoints, let the client decide.
+ // The OverrideHostStatus in Cluster config will
control whether unhealthy endpoints can be used.
// Build LbEndpoint
lbEp := b.buildLbEndpoint(ep)
@@ -214,6 +228,23 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
}
// Create LocalityLbEndpoints with empty locality (default)
+ // CRITICAL: Log endpoint health status for debugging
+ healthyCount := 0
+ unhealthyInLbCount := 0
+ drainingCount := 0
+ for _, lbEp := range lbEndpoints {
+ switch lbEp.HealthStatus {
+ case core.HealthStatus_HEALTHY:
+ healthyCount++
+ case core.HealthStatus_UNHEALTHY:
+ unhealthyInLbCount++
+ case core.HealthStatus_DRAINING:
+ drainingCount++
+ }
+ }
+ log.Infof("BuildClusterLoadAssignment: cluster %s (hostname=%s,
port=%d) - total endpoints: %d (healthy=%d, unhealthy=%d, draining=%d,
SupportsUnhealthyEndpoints=%v)",
+ b.clusterName, b.hostname, b.port, len(lbEndpoints),
healthyCount, unhealthyInLbCount, drainingCount,
b.service.SupportsUnhealthyEndpoints())
+
localityLbEndpoints := []*endpoint.LocalityLbEndpoints{
{
Locality: &core.Locality{},
diff --git a/pkg/art/art.go b/pkg/art/art.go
index 8ca16710..0e0582ef 100644
--- a/pkg/art/art.go
+++ b/pkg/art/art.go
@@ -22,6 +22,8 @@ import (
"github.com/fatih/color"
)
+//
https://patorjk.com/software/taag/#p=display&f=Graffiti&t=Type+Something+&x=none&v=4&h=4&w=80&we=false
+//
//go:embed dubbo-ascii.txt
var dubboASCIIArt string
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index 5e4a382d..506b6405 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -50,7 +50,7 @@ spec:
spec:
containers:
- name: app
- image: dev-debug
+ image: mfordjody/grpc-consumer:dev-debug
imagePullPolicy: Always
ports:
- containerPort: 17070
@@ -101,7 +101,7 @@ spec:
containers:
- name: app
# Replace with your own image containing the producer binary
- image: dev-debug
+ image: mfordjody/grpc-producer:dev-debug
imagePullPolicy: Always
# Optional: uncomment to test direct connection
# - --target=xds:///consumer.grpc-app.svc.cluster.local:7070