This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new e9fcd78e fix eds endpoint push error (#815)
e9fcd78e is described below
commit e9fcd78e0735a7e04adf9f9efff6c079c475c6a9
Author: Jian Zhong <[email protected]>
AuthorDate: Sun Nov 9 04:38:02 2025 +0800
fix eds endpoint push error (#815)
---
README.md | 4 +-
dubbod/planet/pkg/model/endpointshards.go | 42 +++-
.../kube/controller/endpointslice.go | 66 +++++-
dubbod/planet/pkg/xds/ads.go | 61 ++++--
dubbod/planet/pkg/xds/eds.go | 91 +++++++-
.../planet/pkg/xds/endpoints/endpoint_builder.go | 68 +++++-
dubbod/planet/pkg/xds/xdsgen.go | 230 ++++++++++++++++++---
tests/grpc-proxyless/consumer/main.go | 114 +++++++++-
tests/grpc-proxyless/producer/main.go | 42 +++-
9 files changed, 629 insertions(+), 89 deletions(-)
diff --git a/README.md b/README.md
index 970edb7e..d8d276dc 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,9 @@
Dubbo Service Mesh for Kubernetes
</h1>
-[](https://codecov.io/gh/apache/dubbo-kubernetes)
+[](https://goreportcard.com/report/github.com/apache/dubbo-kubernetes)
+[](https://pkg.go.dev/github.com/apache/dubbo-kubernetes)
+[](https://github.com/apache/dubbo-kubernetes/releases)

Cloud-Native proxyless service mesh platform for Dubbo microservices, enabling
zero-latency inter-service communication across multiple protocols, advanced
traffic management, and enterprise-grade security without sidecar proxies.
Built on Istio's control plane architecture with native Kubernetes integration,
it delivers service mesh capabilities with minimal resource overhead and
maximum performance.
diff --git a/dubbod/planet/pkg/model/endpointshards.go
b/dubbod/planet/pkg/model/endpointshards.go
index f93d57fb..08850337 100644
--- a/dubbod/planet/pkg/model/endpointshards.go
+++ b/dubbod/planet/pkg/model/endpointshards.go
@@ -61,8 +61,27 @@ func (e *EndpointIndex) clearCacheForService(svc, ns string)
{
}
func endpointUpdateRequiresPush(oldDubboEndpoints []*DubboEndpoint,
incomingEndpoints []*DubboEndpoint) ([]*DubboEndpoint, bool) {
- if oldDubboEndpoints == nil {
+ // CRITICAL FIX: If old endpoints are empty (nil or empty slice), and
we have new endpoints, we must push
+ // This ensures that when endpoints become available after being empty,
clients receive the update
+ // This is critical for proxyless gRPC - when endpoints become
available, clients must be notified
+ // to prevent "weighted-target: no targets to pick from" errors
+ // Note: len() for nil slices is defined as zero, so we can use len()
directly
+ oldWasEmpty := len(oldDubboEndpoints) == 0
+ newIsEmpty := len(incomingEndpoints) == 0
+
+ if oldWasEmpty {
// If there are no old endpoints, we should push with incoming
endpoints as there is nothing to compare.
+ // CRITICAL: Even if new endpoints are unhealthy, we must push
to notify clients that endpoints are now available
+ // The client will handle unhealthy endpoints appropriately
(e.g., retry, circuit breaker)
+ if !newIsEmpty {
+ return incomingEndpoints, true
+ }
+ // If both old and new are empty, no push needed
+ return incomingEndpoints, false
+ }
+ // CRITICAL FIX: If old endpoints exist but new endpoints are empty, we
must push
+ // This ensures that when endpoints become unavailable, clients receive
empty ClusterLoadAssignment
+ if newIsEmpty {
return incomingEndpoints, true
}
needPush := false
@@ -82,6 +101,7 @@ func endpointUpdateRequiresPush(oldDubboEndpoints
[]*DubboEndpoint, incomingEndp
}
newDubboEndpoints = append(newDubboEndpoints, nie)
} else {
+ // New endpoint that didn't exist before - always push
if it's healthy or should be sent
if nie.HealthStatus != UnHealthy ||
nie.SendUnhealthyEndpoints {
needPush = true
}
@@ -89,6 +109,7 @@ func endpointUpdateRequiresPush(oldDubboEndpoints
[]*DubboEndpoint, incomingEndp
}
}
if !needPush {
+ // Check if any old endpoints were removed
for _, oie := range oldDubboEndpoints {
if _, f := nmap[oie.Key()]; !f {
needPush = true
@@ -175,8 +196,23 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
ep.Lock()
defer ep.Unlock()
oldDubboEndpoints := ep.Shards[shard]
+ // CRITICAL: Check if this is a transition from empty to non-empty
BEFORE calling endpointUpdateRequiresPush
+ // This ensures we always push when endpoints become available, even if
they're unhealthy
+ // Note: len() for nil slices is defined as zero, so we can use len()
directly
+ oldWasEmpty := len(oldDubboEndpoints) == 0
+ newIsEmpty := len(dubboEndpoints) == 0
+ transitionFromEmptyToNonEmpty := oldWasEmpty && !newIsEmpty
+
newDubboEndpoints, needPush :=
endpointUpdateRequiresPush(oldDubboEndpoints, dubboEndpoints)
+ // CRITICAL FIX: If endpoints transition from empty to non-empty, we
MUST push
+ // This is essential for proxyless gRPC - clients need to know
endpoints are now available
+ // even if they're initially unhealthy (client will handle
retries/circuit breaking)
+ if transitionFromEmptyToNonEmpty {
+ needPush = true
+ log.Debugf("UpdateServiceEndpoints: service=%s, shard=%v,
endpoints transitioned from empty to non-empty, forcing push", hostname, shard)
+ }
+
// CRITICAL: Log endpoint update details for debugging
if logPushType {
oldHealthyCount := 0
@@ -197,8 +233,8 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
newUnhealthyCount++
}
}
- log.Debugf("UpdateServiceEndpoints: service=%s, shard=%v,
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d,
unhealthy=%d), needPush=%v, pushType=%v",
- hostname, shard, len(oldDubboEndpoints),
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount,
newUnhealthyCount, needPush, pushType)
+ log.Debugf("UpdateServiceEndpoints: service=%s, shard=%v,
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d,
unhealthy=%d), needPush=%v, pushType=%v, transitionFromEmptyToNonEmpty=%v",
+ hostname, shard, len(oldDubboEndpoints),
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount,
newUnhealthyCount, needPush, pushType, transitionFromEmptyToNonEmpty)
}
if pushType != FullPush && !needPush {
diff --git a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
index b1a5b022..4ecfff87 100644
--- a/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/dubbod/planet/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -19,17 +19,18 @@ package controller
import (
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/log"
"strings"
"sync"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
+
+ "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/hashicorp/go-multierror"
"istio.io/api/annotation"
corev1 "k8s.io/api/core/v1"
@@ -317,13 +318,60 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
}
if !matched {
-
log.Debugf("updateEndpointCacheForSlice: failed to match EndpointSlice.Port
(portNum=%d, portName='%s') with Service %s, using EndpointSlice values",
- epSlicePortNum,
epSlicePortName, svcNamespacedName.Name)
- // Fallback: use EndpointSlice
values
- servicePortNum = epSlicePortNum
- targetPortNum = epSlicePortNum
- if epSlicePortName != "" {
- portName =
epSlicePortName
+ // CRITICAL FIX: If we can't
match by Service, try to find ServicePort by port number only
+ // This handles cases where
EndpointSlice.Port.Name doesn't match but port number does
+ // This is critical for
ensuring portName matches Service.Port.Name
+ for _, kubePort := range
kubeSvc.Spec.Ports {
+ if int32(kubePort.Port)
== epSlicePortNum {
+ // Found by
port number, use Service.Port.Name
+ servicePortNum
= int32(kubePort.Port)
+ portName =
kubePort.Name
+ // Resolve
targetPortNum
+ if
kubePort.TargetPort.Type == intstr.Int {
+
targetPortNum = kubePort.TargetPort.IntVal
+ } else if
kubePort.TargetPort.Type == intstr.String && pod != nil {
+ for _,
container := range pod.Spec.Containers {
+
for _, containerPort := range container.Ports {
+
if containerPort.Name == kubePort.TargetPort.StrVal {
+
targetPortNum = containerPort.ContainerPort
+
break
+
}
+
}
+
if targetPortNum != 0 {
+
break
+
}
+ }
+ }
+ if
targetPortNum == 0 {
+
targetPortNum = servicePortNum
+ }
+ matched = true
+
log.Infof("updateEndpointCacheForSlice: matched ServicePort by port number only
(servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port
(portNum=%d, portName='%s')",
+
servicePortNum, targetPortNum, portName, epSlicePortNum, epSlicePortName)
+ break
+ }
+ }
+
+ if !matched {
+
log.Warnf("updateEndpointCacheForSlice: failed to match EndpointSlice.Port
(portNum=%d, portName='%s') with Service %s, using EndpointSlice values
(WARNING: portName may not match Service.Port.Name)",
+ epSlicePortNum,
epSlicePortName, svcNamespacedName.Name)
+ // Fallback: use
EndpointSlice values
+ // CRITICAL: This
should rarely happen, but if it does, portName may not match Service.Port.Name
+ // which will cause
endpoints to be filtered in BuildClusterLoadAssignment
+ servicePortNum =
epSlicePortNum
+ targetPortNum =
epSlicePortNum
+ if epSlicePortName !=
"" {
+ portName =
epSlicePortName
+ } else {
+ // If
EndpointSlice.Port.Name is also empty, try to get port name from Service by
port number
+ for _, kubePort
:= range kubeSvc.Spec.Ports {
+ if
int32(kubePort.Port) == epSlicePortNum {
+
portName = kubePort.Name
+
log.Debugf("updateEndpointCacheForSlice: resolved portName='%s' from Service by
port number %d", portName, epSlicePortNum)
+
break
+ }
+ }
+ }
}
}
} else {
diff --git a/dubbod/planet/pkg/xds/ads.go b/dubbod/planet/pkg/xds/ads.go
index eb66ca30..55a797dc 100644
--- a/dubbod/planet/pkg/xds/ads.go
+++ b/dubbod/planet/pkg/xds/ads.go
@@ -85,13 +85,16 @@ func (s *DiscoveryServer) StartPush(req *model.PushRequest)
{
}
func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) {
+ connectedEndpoints := s.adsClientCount()
if !req.Full {
- log.Infof("XDS: :%d Version:%s",
- s.adsClientCount(), req.Push.PushVersion)
+ // Incremental push - only log connection count and version
+ log.Infof("XDS: Incremental Push to %d Endpoints, Version:%s",
+ connectedEndpoints, req.Push.PushVersion)
} else {
+ // Full push - log services, connections, and version
totalService := len(req.Push.GetAllServices())
log.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d
Version:%s",
- totalService, s.adsClientCount(), req.Push.PushVersion)
+ totalService, connectedEndpoints, req.Push.PushVersion)
if req.ConfigsUpdated == nil {
req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
@@ -331,19 +334,38 @@ func (s *DiscoveryServer) processRequest(req
*discovery.DiscoveryRequest, con *C
shouldRespond, delta := xds.ShouldRespond(con.proxy, con.ID(), req)
- if !shouldRespond {
- // Don't log ACK/ignored/expired nonce requests to reduce noise
- // These are normal and expected, logging them creates too much
noise
- return nil
- }
-
- // Log requested resource names only for requests that will be processed
+ // CRITICAL: Log NEW requests (will respond) at INFO level so every
grpcurl request is visible
+ // This ensures every grpcurl request triggers visible xDS logs in
control plane
+ // Format: "LDS: REQ node-xxx resources:1 nonce:abc123 [resource1,
resource2] (will respond)"
resourceNamesStr := ""
if len(req.ResourceNames) > 0 {
- resourceNamesStr = fmt.Sprintf(" [%s]",
strings.Join(req.ResourceNames, ", "))
+ // Show resource names for better debugging
+ if len(req.ResourceNames) <= 10 {
+ resourceNamesStr = fmt.Sprintf(" [%s]",
strings.Join(req.ResourceNames, ", "))
+ } else {
+ // If too many resources, show first 5 and count
+ resourceNamesStr = fmt.Sprintf(" [%s, ... and %d
more]", strings.Join(req.ResourceNames[:5], ", "), len(req.ResourceNames)-5)
+ }
+ } else {
+ resourceNamesStr = " [wildcard]"
+ }
+
+ if shouldRespond {
+ // Log NEW requests at INFO level - these are triggered by
grpcurl requests
+ // This makes it easy to see when a grpcurl request triggers
xDS configuration
+ log.Infof("%s: REQ %s resources:%d nonce:%s%s (will respond)",
stype,
+ con.ID(), len(req.ResourceNames), req.ResponseNonce,
resourceNamesStr)
+ } else {
+ // Log ACK/ignored requests at DEBUG level to reduce noise
+ // These are normal XDS protocol ACKs, not new requests from
grpcurl
+ log.Debugf("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)",
stype,
+ con.ID(), len(req.ResourceNames), req.ResponseNonce,
resourceNamesStr)
+ }
+
+ if !shouldRespond {
+ // Don't process ACK/ignored/expired nonce requests
+ return nil
}
- log.Infof("%s: REQ %s resources:%d nonce:%s%s", stype,
- con.ID(), len(req.ResourceNames), req.ResponseNonce,
resourceNamesStr)
// CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty
ResourceNames) after receiving specific resources,
// this is likely an ACK and we should NOT push all resources again
@@ -359,12 +381,19 @@ func (s *DiscoveryServer) processRequest(req
*discovery.DiscoveryRequest, con *C
return nil
}
- // Log PUSH request BEFORE calling pushXds
+ // Log PUSH request BEFORE calling pushXds with detailed information
+ // This helps track what resources are being requested for each push
+ pushResourceNamesStr := ""
if len(req.ResourceNames) > 0 {
- log.Infof("%s: PUSH request for node:%s resources:%d", stype,
con.ID(), len(req.ResourceNames))
+ if len(req.ResourceNames) <= 10 {
+ pushResourceNamesStr = fmt.Sprintf(" resources:%d
[%s]", len(req.ResourceNames), strings.Join(req.ResourceNames, ", "))
+ } else {
+ pushResourceNamesStr = fmt.Sprintf(" resources:%d [%s,
... and %d more]", len(req.ResourceNames), strings.Join(req.ResourceNames[:5],
", "), len(req.ResourceNames)-5)
+ }
} else {
- log.Infof("%s: PUSH request for node:%s", stype, con.ID())
+ pushResourceNamesStr = " resources:0 [wildcard]"
}
+ log.Infof("%s: PUSH request for node:%s%s", stype, con.ID(),
pushResourceNamesStr)
// Don't set Forced=true for regular proxy requests to avoid
unnecessary ServiceTargets recomputation
// Only set Forced for debug requests or when explicitly needed
diff --git a/dubbod/planet/pkg/xds/eds.go b/dubbod/planet/pkg/xds/eds.go
index e9c1665c..0a734f8a 100644
--- a/dubbod/planet/pkg/xds/eds.go
+++ b/dubbod/planet/pkg/xds/eds.go
@@ -47,12 +47,38 @@ func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey,
serviceName string, na
dubboEndpoints []*model.DubboEndpoint,
) {
pushType := s.Env.EndpointIndex.UpdateServiceEndpoints(shard,
serviceName, namespace, dubboEndpoints, true)
+ // CRITICAL FIX: Always push EDS updates when endpoints change state
+ // This ensures clients receive updates when:
+ // 1. Endpoints become available (from empty to non-empty)
+ // 2. Endpoints become unavailable (from non-empty to empty)
+ // 3. Endpoint health status changes
+ // This is critical for proxyless gRPC to prevent "weighted-target: no
targets to pick from" errors
if pushType == model.IncrementalPush || pushType == model.FullPush {
+ log.Infof("EDSUpdate: service %s/%s triggering %v push
(endpoints=%d)", namespace, serviceName, pushType, len(dubboEndpoints))
s.ConfigUpdate(&model.PushRequest{
Full: pushType == model.FullPush,
ConfigsUpdated: sets.New(model.ConfigKey{Kind:
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
Reason:
model.NewReasonStats(model.EndpointUpdate),
})
+ } else if pushType == model.NoPush {
+ // CRITICAL: Even when UpdateServiceEndpoints returns NoPush,
we may still need to push
+ // This happens when:
+ // 1. All old endpoints were unhealthy and new endpoints are
also unhealthy (health status didn't change)
+ // 2. But we still need to notify clients about the current
state
+ // For proxyless gRPC, we should push even if endpoints are
empty to ensure clients know the state
+ if len(dubboEndpoints) == 0 {
+ log.Infof("EDSUpdate: service %s/%s endpoints became
empty (NoPush), forcing push to clear client cache", namespace, serviceName)
+ s.ConfigUpdate(&model.PushRequest{
+ Full: false, // Incremental push
+ ConfigsUpdated: sets.New(model.ConfigKey{Kind:
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
+ Reason:
model.NewReasonStats(model.EndpointUpdate),
+ })
+ } else {
+ // Endpoints exist but NoPush was returned - this means
health status didn't change
+ // For proxyless gRPC, we should still push to ensure
clients have the latest endpoint info
+ // This is especially important when endpoints become
available after being empty
+ log.Debugf("EDSUpdate: service %s/%s has %d endpoints
but NoPush returned (health status unchanged), skipping push", namespace,
serviceName, len(dubboEndpoints))
+ }
}
}
@@ -153,23 +179,39 @@ func (eds *EdsGenerator) buildEndpoints(proxy
*model.Proxy,
}
}
+ // CRITICAL FIX: For proxyless gRPC, we MUST always process all
watched clusters
+ // This ensures clients receive EDS updates even when endpoints
become available or unavailable
+ // For non-proxyless (Istio behavior), we skip clusters that
are not in edsUpdatedServices for incremental pushes
if edsUpdatedServices != nil {
if _, ok := edsUpdatedServices[hostname]; !ok {
- // Cluster was not updated, skip recomputing.
This happens when we get an incremental update for a
- // specific Hostname. On connect or for full
push edsUpdatedServices will be empty.
+ // Cluster was not in edsUpdatedServices
if !serviceWasUpdated {
- continue
+ // CRITICAL: For proxyless gRPC, always
process all clusters to ensure EDS is up-to-date
+ // This is essential because proxyless
gRPC clients need to know endpoint state changes
+ // even if the service wasn't
explicitly updated in this push
+ if proxy.IsProxylessGrpc() {
+ log.Debugf("buildEndpoints:
proxyless gRPC, processing cluster %s even though not in edsUpdatedServices
(hostname=%s)", clusterName, hostname)
+ serviceWasUpdated = true
+ } else {
+ // For non-proxyless, skip if
not updated (Istio behavior)
+ continue
+ }
}
} else {
serviceWasUpdated = true
}
+ } else {
+ // If edsUpdatedServices is nil, this is a full push -
process all clusters
+ serviceWasUpdated = true
}
builder := endpoints.NewEndpointBuilder(clusterName, proxy,
req.Push)
if builder == nil {
continue
}
- // CRITICAL FIX: For incremental pushes (Full=false), we must
always regenerate EDS
+ // CRITICAL FIX: For proxyless gRPC, we must always regenerate
EDS when serviceWasUpdated is true
+ // to ensure empty endpoints are sent when they become
unavailable, and endpoints are sent when available.
+ // For incremental pushes (Full=false), we must always
regenerate EDS
// if ConfigsUpdated contains this service, because endpoint
health status may have changed.
// The cache may contain stale empty endpoints from when the
endpoint was unhealthy.
// For full pushes, we can use cache if the service was not
updated.
@@ -181,11 +223,19 @@ func (eds *EdsGenerator) buildEndpoints(proxy
*model.Proxy,
for ckey := range req.ConfigsUpdated {
if ckey.Kind == kind.ServiceEntry && ckey.Name
== hostname {
shouldRegenerate = true
+ log.Debugf("buildEndpoints: forcing
regeneration for cluster %s (hostname=%s) due to ConfigsUpdated", clusterName,
hostname)
break
}
}
}
+ // CRITICAL: For proxyless gRPC, if serviceWasUpdated is true,
we must regenerate
+ // even if cache exists, to ensure endpoints are always
up-to-date
+ // This is critical for preventing "weighted-target: no targets
to pick from" errors
+ if shouldRegenerate && proxy.IsProxylessGrpc() {
+ log.Debugf("buildEndpoints: proxyless gRPC, forcing
regeneration for cluster %s (hostname=%s, serviceWasUpdated=%v)", clusterName,
hostname, serviceWasUpdated)
+ }
+
// Try to get from cache only if we don't need to regenerate
if !shouldRegenerate && eds.Cache != nil {
cachedEndpoint := eds.Cache.Get(builder)
@@ -198,22 +248,41 @@ func (eds *EdsGenerator) buildEndpoints(proxy
*model.Proxy,
// generate eds from beginning
l := builder.BuildClusterLoadAssignment(eds.EndpointIndex)
- // CRITICAL FIX: Even if endpoints are empty (l == nil), we
should still create an empty ClusterLoadAssignment
- // to ensure the client receives the EDS response. This is
necessary for proxyless gRPC clients
- // to know the endpoint state, even if it's empty initially.
- // Note: BuildClusterLoadAssignment returns nil when no
endpoints are found,
- // but we still need to send an empty ClusterLoadAssignment to
the client.
+ // NOTE: BuildClusterLoadAssignment never returns nil - it
always returns either:
+ // 1. A valid ClusterLoadAssignment with endpoints, or
+ // 2. An empty ClusterLoadAssignment (via
buildEmptyClusterLoadAssignment)
+ // The empty ClusterLoadAssignment has an explicit empty
Endpoints list,
+ // which is critical for proxyless gRPC clients to know the
endpoint state
+ // and prevent "weighted-target: no targets to pick from"
errors.
+ // The nil check below is defensive programming, but should
never be true in practice.
if l == nil {
- // Build an empty ClusterLoadAssignment for this cluster
- // Use the same logic as
endpoint_builder.go:buildEmptyClusterLoadAssignment
+ // Defensive: Build an empty ClusterLoadAssignment if
somehow nil is returned
+ // This should never happen, but ensures we always send
a valid response
l = &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
+ Endpoints: []*endpoint.LocalityLbEndpoints{},
// Empty endpoints list
}
+ log.Warnf("buildEndpoints: BuildClusterLoadAssignment
returned nil for cluster %s (hostname: %s), created empty CLA", clusterName,
hostname)
}
regenerated++
+ // CRITICAL: Log EDS push details for proxyless gRPC to help
diagnose "weighted-target: no targets to pick from" errors
if len(l.Endpoints) == 0 {
empty++
+ if proxy.IsProxylessGrpc() {
+ log.Infof("buildEndpoints: proxyless gRPC,
pushing empty EDS for cluster %s (hostname=%s, serviceWasUpdated=%v,
shouldRegenerate=%v)",
+ clusterName, hostname,
serviceWasUpdated, shouldRegenerate)
+ }
+ } else {
+ // Count total endpoints in the ClusterLoadAssignment
+ totalEndpointsInCLA := 0
+ for _, localityLbEp := range l.Endpoints {
+ totalEndpointsInCLA +=
len(localityLbEp.LbEndpoints)
+ }
+ if proxy.IsProxylessGrpc() {
+ log.Infof("buildEndpoints: proxyless gRPC,
pushing EDS for cluster %s (hostname=%s, endpoints=%d, serviceWasUpdated=%v,
shouldRegenerate=%v)",
+ clusterName, hostname,
totalEndpointsInCLA, serviceWasUpdated, shouldRegenerate)
+ }
}
resource := &discovery.Resource{
Name: l.ClusterName,
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index e21f2f09..37878bb4 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -18,6 +18,8 @@
package endpoints
import (
+ "fmt"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/util"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
@@ -85,7 +87,9 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
// Get endpoints from the endpoint index
shards, ok := endpointIndex.ShardsForService(string(b.hostname),
b.service.Attributes.Namespace)
if !ok {
- log.Warnf("BuildClusterLoadAssignment: no shards found for
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s',
svcPort.Port=%d)",
+ // CRITICAL: Log at INFO level for proxyless gRPC to help
diagnose "weighted-target: no targets to pick from" errors
+ log.Infof("BuildClusterLoadAssignment: no shards found for
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s',
svcPort.Port=%d). "+
+ "This usually means endpoints are not yet available or
service is not registered.",
b.hostname, b.service.Attributes.Namespace,
b.clusterName, b.port, svcPort.Name, svcPort.Port)
return buildEmptyClusterLoadAssignment(b.clusterName)
}
@@ -119,16 +123,46 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
var lbEndpoints []*endpoint.LbEndpoint
var filteredCount int
var totalEndpoints int
+ var portNameMismatchCount int
+ var unhealthyCount int
+ var buildFailedCount int
+
+ // CRITICAL: Log all endpoint ServicePortNames for debugging port name
matching issues
+ allServicePortNames := make(map[string]int)
+ for _, eps := range shards.Shards {
+ for _, ep := range eps {
+ allServicePortNames[ep.ServicePortName]++
+ }
+ }
+ if len(allServicePortNames) > 0 {
+ portNamesList := make([]string, 0, len(allServicePortNames))
+ for name, count := range allServicePortNames {
+ portNamesList = append(portNamesList,
fmt.Sprintf("'%s'(%d)", name, count))
+ }
+ log.Infof("BuildClusterLoadAssignment: service %s port %d
(svcPort.Name='%s'), found endpoints with ServicePortNames: %v",
+ b.hostname, b.port, svcPort.Name, portNamesList)
+ }
for _, eps := range shards.Shards {
for _, ep := range eps {
totalEndpoints++
// Filter by port name
+ // CRITICAL: According to Istio's implementation, we
must match ServicePortName exactly
+ // However, if ServicePortName is empty, we should
still include the endpoint if there's only one port
+ // This handles cases where EndpointSlice doesn't have
port name but Service does
if ep.ServicePortName != svcPort.Name {
- filteredCount++
- log.Debugf("BuildClusterLoadAssignment:
filtering out endpoint %s (port name mismatch: '%s' != '%s')",
- ep.FirstAddressOrNil(),
ep.ServicePortName, svcPort.Name)
- continue
+ // Special case: if ServicePortName is empty
and service has only one port, include it
+ if ep.ServicePortName == "" &&
len(b.service.Ports) == 1 {
+ log.Debugf("BuildClusterLoadAssignment:
including endpoint %s with empty ServicePortName (service has only one port
'%s')",
+ ep.FirstAddressOrNil(),
svcPort.Name)
+ // Continue processing this endpoint
+ } else {
+ portNameMismatchCount++
+ filteredCount++
+ log.Warnf("BuildClusterLoadAssignment:
filtering out endpoint %s (port name mismatch: ep.ServicePortName='%s' !=
svcPort.Name='%s', EndpointPort=%d)",
+ ep.FirstAddressOrNil(),
ep.ServicePortName, svcPort.Name, ep.EndpointPort)
+ continue
+ }
}
// Filter by subset labels if subset is specified
@@ -145,16 +179,19 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
// as the client will handle connection failures.
However, this is controlled by the service
// configuration (publishNotReadyAddresses).
if !b.service.SupportsUnhealthyEndpoints() &&
ep.HealthStatus == model.UnHealthy {
- log.Debugf("BuildClusterLoadAssignment:
filtering out unhealthy endpoint %s", ep.FirstAddressOrNil())
+ unhealthyCount++
filteredCount++
+ log.Debugf("BuildClusterLoadAssignment:
filtering out unhealthy endpoint %s (HealthStatus=%v,
SupportsUnhealthyEndpoints=%v)",
+ ep.FirstAddressOrNil(),
ep.HealthStatus, b.service.SupportsUnhealthyEndpoints())
continue
}
// Build LbEndpoint
lbEp := b.buildLbEndpoint(ep)
if lbEp == nil {
- log.Debugf("BuildClusterLoadAssignment:
buildLbEndpoint returned nil for endpoint %s", ep.FirstAddressOrNil())
+ buildFailedCount++
filteredCount++
+ log.Debugf("BuildClusterLoadAssignment:
buildLbEndpoint returned nil for endpoint %s", ep.FirstAddressOrNil())
continue
}
lbEndpoints = append(lbEndpoints, lbEp)
@@ -162,8 +199,17 @@ func (b *EndpointBuilder)
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
}
if len(lbEndpoints) == 0 {
- log.Debugf("BuildClusterLoadAssignment: no endpoints found for
cluster %s (hostname=%s, port=%d, totalEndpoints=%d, filteredCount=%d)",
- b.clusterName, b.hostname, b.port, totalEndpoints,
filteredCount)
+ // CRITICAL: Log at WARN level for proxyless gRPC to help
diagnose "weighted-target: no targets to pick from" errors
+ logLevel := log.Debugf
+ // For proxyless gRPC, log empty endpoints at INFO level to
help diagnose connection issues
+ // This helps identify when endpoints are not available vs when
they're filtered out
+ if totalEndpoints > 0 {
+ logLevel = log.Warnf // If endpoints exist but were
filtered, this is a warning
+ } else {
+ logLevel = log.Infof // If no endpoints exist at all,
this is informational
+ }
+ logLevel("BuildClusterLoadAssignment: no endpoints found for
cluster %s (hostname=%s, port=%d, svcPort.Name='%s', svcPort.Port=%d,
totalEndpoints=%d, filteredCount=%d, portNameMismatch=%d, unhealthy=%d,
buildFailed=%d)",
+ b.clusterName, b.hostname, b.port, svcPort.Name,
svcPort.Port, totalEndpoints, filteredCount, portNameMismatchCount,
unhealthyCount, buildFailedCount)
return buildEmptyClusterLoadAssignment(b.clusterName)
}
@@ -243,8 +289,12 @@ func (b *EndpointBuilder) buildLbEndpoint(ep
*model.DubboEndpoint) *endpoint.LbE
}
func buildEmptyClusterLoadAssignment(clusterName string)
*endpoint.ClusterLoadAssignment {
+ // CRITICAL FIX: Following Istio's pattern, empty ClusterLoadAssignment
should have empty Endpoints list
+ // This ensures gRPC proxyless clients receive the update and clear
their endpoint cache,
+ // preventing "weighted-target: no targets to pick from" errors
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
+ Endpoints: []*endpoint.LocalityLbEndpoints{}, // Explicitly
empty, not nil
}
}
diff --git a/dubbod/planet/pkg/xds/xdsgen.go b/dubbod/planet/pkg/xds/xdsgen.go
index 58971e8f..5ba03709 100644
--- a/dubbod/planet/pkg/xds/xdsgen.go
+++ b/dubbod/planet/pkg/xds/xdsgen.go
@@ -34,6 +34,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/xds"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ listener
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+ hcmv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
@@ -160,6 +162,36 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
}
}
}
+ } else if w.TypeUrl == v3.RouteType {
+ // CRITICAL FIX: Extract route names from LDS
response for RDS wildcard requests
+ // RDS is not a wildcard type, so when client
sends empty ResourceNames,
+ // we need to extract route names from LDS
listeners that reference RDS
+ ldsWatched :=
con.proxy.GetWatchedResource(v3.ListenerType)
+ if ldsWatched != nil && ldsWatched.NonceSent !=
"" {
+ // LDS has been sent, extract route
names from it
+ log.Debugf("pushXds: RDS wildcard
request, extracting route names from LDS")
+ // Create a temporary request to
generate LDS and extract route names
+ ldsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.DependentResource),
+ Start: con.proxy.LastPushTime,
+ Forced: false,
+ }
+ ldsGen :=
s.findGenerator(v3.ListenerType, con)
+ if ldsGen != nil {
+ ldsRes, _, _ :=
ldsGen.Generate(con.proxy, ldsWatched, ldsReq)
+ if len(ldsRes) > 0 {
+ routeNames :=
extractRouteNamesFromLDS(ldsRes)
+ if len(routeNames) > 0 {
+ w.ResourceNames
= sets.New(routeNames...)
+
requestedResourceNames = sets.New[string]()
+
requestedResourceNames.InsertAll(routeNames...)
+
log.Debugf("pushXds: extracted %d route names from LDS: %v", len(routeNames),
routeNames)
+ }
+ }
+ }
+ }
}
} else if len(w.ResourceNames) > 0 {
// Specific resource request
@@ -317,29 +349,44 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
case !req.Full:
default:
// Log format matches Istio: "LDS: PUSH for node:xxx
resources:1 size:342B"
+ // CRITICAL: Always log resource names for better debugging,
especially for proxyless gRPC
resourceNamesStr := ""
- if len(res) > 0 && len(res) <= 5 {
- // Log resource names if there are few resources (for
debugging)
- names := make([]string, 0, len(res))
- for _, r := range res {
- names = append(names, r.Name)
+ if len(res) > 0 {
+ if len(res) <= 10 {
+ // Log all resource names if there are few
resources
+ names := make([]string, 0, len(res))
+ for _, r := range res {
+ names = append(names, r.Name)
+ }
+ resourceNamesStr = fmt.Sprintf(" [%s]",
strings.Join(names, ", "))
+ } else {
+ // If too many resources, show first 5 and count
+ names := make([]string, 0, 5)
+ for i := 0; i < 5 && i < len(res); i++ {
+ names = append(names, res[i].Name)
+ }
+ resourceNamesStr = fmt.Sprintf(" [%s, ... and
%d more]", strings.Join(names, ", "), len(res)-5)
}
- resourceNamesStr = fmt.Sprintf(" [%s]",
strings.Join(names, ", "))
+ } else {
+ resourceNamesStr = " [empty]"
}
log.Infof("%s: %s for node:%s resources:%d size:%s%s%s",
v3.GetShortType(w.TypeUrl), ptype, con.proxy.ID, len(res),
util.ByteCount(ResourceSize(res)), info,
resourceNamesStr)
}
// CRITICAL FIX: For proxyless gRPC, after pushing LDS with outbound
listeners,
- // automatically trigger CDS push for the referenced clusters ONLY if
this is a direct request push
- // (not a push from pushConnection which would cause loops)
- // Only auto-push if CDS is not already being watched by the client
(client will request it naturally)
+ // automatically trigger CDS and RDS push for the referenced clusters
and routes
+ // ONLY if this is a direct request push (not a push from
pushConnection which would cause loops)
+ // Only auto-push if CDS/RDS is not already being watched by the client
(client will request it naturally)
if w.TypeUrl == v3.ListenerType && con.proxy.IsProxylessGrpc() &&
len(res) > 0 {
- // Only auto-push CDS if this is a direct request (not a full
push from pushConnection)
+ // Only auto-push CDS/RDS if this is a direct request (not a
full push from pushConnection)
// Check if this push was triggered by a direct client request
using IsRequest()
isDirectRequest := req.IsRequest()
if isDirectRequest {
clusterNames := extractClusterNamesFromLDS(res)
+ routeNames := extractRouteNamesFromLDS(res)
+
+ // Auto-push CDS for referenced clusters
if len(clusterNames) > 0 {
cdsWatched :=
con.proxy.GetWatchedResource(v3.ClusterType)
// Only auto-push CDS if client hasn't already
requested it
@@ -365,23 +412,95 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
log.Debugf("pushXds: LDS push
completed, client already requested CDS, skipping auto-push")
}
}
+
+ // CRITICAL FIX: Auto-push RDS for referenced routes
+ // This is essential for gRPC proxyless - client needs
RDS to route traffic correctly
+ if len(routeNames) > 0 {
+ rdsWatched :=
con.proxy.GetWatchedResource(v3.RouteType)
+ // Only auto-push RDS if client hasn't already
requested it
+ if rdsWatched == nil ||
rdsWatched.ResourceNames == nil || len(rdsWatched.ResourceNames) == 0 {
+ // Client hasn't requested RDS yet,
auto-push to ensure client gets the route config
+
con.proxy.NewWatchedResource(v3.RouteType, routeNames)
+ log.Debugf("pushXds: LDS push
completed, auto-pushing RDS for routes: %v", routeNames)
+ // Trigger RDS push directly without
going through pushConnection to avoid loops
+ rdsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.ProxyRequest),
+ Start: con.proxy.LastPushTime,
+ Forced: false,
+ }
+ if err := s.pushXds(con,
con.proxy.GetWatchedResource(v3.RouteType), rdsReq); err != nil {
+ log.Warnf("pushXds: failed to
push RDS after LDS: %v", err)
+ }
+ } else {
+ // Check if any route names are missing
from the watched set
+ existingNames :=
rdsWatched.ResourceNames
+ if existingNames == nil {
+ existingNames =
sets.New[string]()
+ }
+ hasNewRoutes := false
+ for _, rn := range routeNames {
+ if !existingNames.Contains(rn) {
+ hasNewRoutes = true
+ break
+ }
+ }
+ if hasNewRoutes {
+ // Update RDS watched resource
to include the new route names
+
con.proxy.UpdateWatchedResource(v3.RouteType, func(wr *model.WatchedResource)
*model.WatchedResource {
+ if wr == nil {
+ wr =
&model.WatchedResource{TypeUrl: v3.RouteType, ResourceNames: sets.New[string]()}
+ }
+ existingNames :=
wr.ResourceNames
+ if existingNames == nil
{
+ existingNames =
sets.New[string]()
+ }
+ for _, rn := range
routeNames {
+
existingNames.Insert(rn)
+ }
+ wr.ResourceNames =
existingNames
+ return wr
+ })
+ log.Debugf("pushXds: LDS push
completed, updating RDS watched resource with new routes: %v", routeNames)
+ // Trigger RDS push for the new
routes
+ rdsReq := &model.PushRequest{
+ Full: true,
+ Push: req.Push,
+ Reason:
model.NewReasonStats(model.DependentResource),
+ Start:
con.proxy.LastPushTime,
+ Forced: false,
+ }
+ if err := s.pushXds(con,
con.proxy.GetWatchedResource(v3.RouteType), rdsReq); err != nil {
+ log.Warnf("pushXds:
failed to push RDS after LDS: %v", err)
+ }
+ } else {
+ log.Debugf("pushXds: LDS push
completed, RDS routes already watched: %v", routeNames)
+ }
+ }
+ }
}
}
// CRITICAL FIX: For proxyless gRPC, after pushing CDS with EDS
clusters,
- // automatically trigger EDS push for the referenced endpoints
- // This is necessary for load balancing - client needs EDS to discover
all available endpoints
+ // we should NOT automatically push EDS before the client requests it.
+ // The gRPC xDS client will automatically request EDS after receiving
CDS with EDS clusters.
+ // If we push EDS before the client requests it, the client will
receive the response
+ // without having a state, causing "no state exists for it" warnings
and "weighted-target: no targets to pick from" errors.
+ // Instead, we should:
+ // 1. Update the watched resource to include the EDS cluster names (so
when client requests EDS, we know what to send)
+ // 2. Wait for the client to request EDS naturally
+ // 3. When the client requests EDS, we will push it with the correct
state
if w.TypeUrl == v3.ClusterType && con.proxy.IsProxylessGrpc() &&
len(res) > 0 {
// Extract EDS cluster names from CDS resources
edsClusterNames := extractEDSClusterNamesFromCDS(res)
if len(edsClusterNames) > 0 {
edsWatched :=
con.proxy.GetWatchedResource(v3.EndpointType)
- shouldPushEDS := false
if edsWatched == nil {
- // EDS not watched yet, create watched resource
+ // EDS not watched yet, create watched resource
with cluster names
+ // This ensures that when the client requests
EDS, we know which clusters to send
con.proxy.NewWatchedResource(v3.EndpointType,
edsClusterNames)
- shouldPushEDS = true
- log.Debugf("pushXds: CDS push completed,
auto-pushing EDS for clusters: %v", edsClusterNames)
+ log.Debugf("pushXds: CDS push completed,
created EDS watched resource for clusters: %v (waiting for client request)",
edsClusterNames)
} else {
// Check if any cluster names are missing from
the watched set
existingNames := edsWatched.ResourceNames
@@ -411,27 +530,14 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
wr.ResourceNames = existingNames
return wr
})
- shouldPushEDS = true
- log.Debugf("pushXds: CDS push
completed, updating EDS watched resource with new clusters: %v",
edsClusterNames)
+ log.Debugf("pushXds: CDS push
completed, updated EDS watched resource with new clusters: %v (waiting for
client request)", edsClusterNames)
} else {
log.Debugf("pushXds: CDS push
completed, EDS clusters already watched: %v", edsClusterNames)
}
}
-
- // Only push EDS if we have new clusters to push
- if shouldPushEDS {
- // Trigger EDS push directly without going
through pushConnection to avoid loops
- edsReq := &model.PushRequest{
- Full: true,
- Push: req.Push,
- Reason:
model.NewReasonStats(model.DependentResource),
- Start: con.proxy.LastPushTime,
- Forced: true, // Force EDS push to
ensure endpoints are available for load balancing
- }
- if err := s.pushXds(con,
con.proxy.GetWatchedResource(v3.EndpointType), edsReq); err != nil {
- log.Warnf("pushXds: failed to push EDS
after CDS: %v", err)
- }
- }
+ // CRITICAL: Do NOT push EDS here - wait for the client
to request it naturally
+ // The gRPC xDS client will automatically request EDS
after receiving CDS with EDS clusters
+ // Pushing EDS before the client requests it causes "no
state exists" warnings
}
}
@@ -487,6 +593,64 @@ func extractClusterNamesFromLDS(listeners model.Resources)
[]string {
return clusterNames.UnsortedList()
}
+// extractRouteNamesFromLDS extracts route names referenced in LDS listener
resources
+// For outbound listeners with ApiListener, the route name is the
RouteConfigName from Rds config
+// Route name format is the same as cluster name: "outbound|port||hostname"
+func extractRouteNamesFromLDS(listeners model.Resources) []string {
+ routeNames := sets.New[string]()
+ for _, r := range listeners {
+ // Unmarshal the listener resource to extract route names
+ ll := &listener.Listener{}
+ if err := r.Resource.UnmarshalTo(ll); err != nil {
+ log.Debugf("extractRouteNamesFromLDS: failed to
unmarshal listener %s: %v", r.Name, err)
+ continue
+ }
+
+ // Check if this listener has ApiListener (used by gRPC
proxyless for outbound)
+ if ll.ApiListener != nil && ll.ApiListener.ApiListener != nil {
+ // Unmarshal ApiListener to get HttpConnectionManager
+ hcm := &hcmv3.HttpConnectionManager{}
+ if err := ll.ApiListener.ApiListener.UnmarshalTo(hcm);
err != nil {
+ log.Debugf("extractRouteNamesFromLDS: failed to
unmarshal ApiListener for listener %s: %v", r.Name, err)
+ continue
+ }
+
+ // Check if HttpConnectionManager uses RDS
+ if hcm.RouteSpecifier != nil {
+ if rds, ok :=
hcm.RouteSpecifier.(*hcmv3.HttpConnectionManager_Rds); ok && rds.Rds != nil {
+ // Found RDS reference, extract route
name
+ routeName := rds.Rds.RouteConfigName
+ if routeName != "" {
+ routeNames.Insert(routeName)
+
log.Debugf("extractRouteNamesFromLDS: found route name %s from listener %s",
routeName, r.Name)
+ }
+ }
+ }
+ }
+
+ // Fallback: If ApiListener extraction failed, try to extract
from listener name
+ // This is a backup method in case the listener structure is
different
+ // Outbound listener format: "hostname:port" -> route:
"outbound|port||hostname"
+ listenerName := r.Name
+ if strings.Contains(listenerName, ":") &&
!strings.HasPrefix(listenerName, "xds.dubbo.apache.org/grpc/lds/inbound/") {
+ // This is an outbound listener
+ parts := strings.Split(listenerName, ":")
+ if len(parts) == 2 {
+ hostname := parts[0]
+ portStr := parts[1]
+ port, err := strconv.Atoi(portStr)
+ if err == nil {
+ // Build route name:
outbound|port||hostname (same format as cluster name)
+ routeName :=
model.BuildSubsetKey(model.TrafficDirectionOutbound, "", host.Name(hostname),
port)
+ routeNames.Insert(routeName)
+ log.Debugf("extractRouteNamesFromLDS:
extracted route name %s from listener name %s", routeName, listenerName)
+ }
+ }
+ }
+ }
+ return routeNames.UnsortedList()
+}
+
func ResourceSize(r model.Resources) int {
size := 0
for _, r := range r {
diff --git a/tests/grpc-proxyless/consumer/main.go
b/tests/grpc-proxyless/consumer/main.go
index 5e3e665d..4b8476ca 100644
--- a/tests/grpc-proxyless/consumer/main.go
+++ b/tests/grpc-proxyless/consumer/main.go
@@ -25,8 +25,10 @@ import (
"net"
"os"
"os/signal"
+ "regexp"
"strings"
"syscall"
+ "time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -110,12 +112,39 @@ type grpcLogger struct {
logger *log.Logger
}
+var (
+ // Regex to match gRPC formatting errors like %!p(...)
+ formatErrorRegex = regexp.MustCompile(`%!p\([^)]+\)`)
+)
+
+// cleanMessage removes formatting errors from gRPC logs
+// Fixes issues like:
"\u003c%!p(networktype.keyType=grpc.internal.transport.networktype)\u003e":
"unix"
+func cleanMessage(msg string) string {
+ // Replace %!p(...) patterns with a cleaner representation
+ msg = formatErrorRegex.ReplaceAllStringFunc(msg, func(match string)
string {
+ // Extract the key from %!p(networktype.keyType=...)
+ if strings.Contains(match, "networktype.keyType") {
+ return `"networktype"`
+ }
+ // For other cases, just remove the error pattern
+ return ""
+ })
+ // Also clean up Unicode escape sequences that appear with formatting
errors
+ // Replace \u003c (which is <) and \u003e (which is >) when they appear
with formatting errors
+ msg = strings.ReplaceAll(msg, `\u003c`, "<")
+ msg = strings.ReplaceAll(msg, `\u003e`, ">")
+ // Clean up patterns like <...>: "unix" to just show the value
+ msg = regexp.MustCompile(`<[^>]*>:\s*"unix"`).ReplaceAllString(msg,
`"networktype": "unix"`)
+ return msg
+}
+
func (l *grpcLogger) Info(args ...interface{}) {
msg := fmt.Sprint(args...)
// Filter out xDS "entering mode: SERVING" logs
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Print("INFO: ", msg)
}
@@ -124,6 +153,7 @@ func (l *grpcLogger) Infoln(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Print("INFO: ", msg)
}
@@ -132,19 +162,23 @@ func (l *grpcLogger) Infof(format string, args
...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Printf("INFO: %s", msg)
}
func (l *grpcLogger) Warning(args ...interface{}) {
- l.logger.Print("WARNING: ", fmt.Sprint(args...))
+ msg := cleanMessage(fmt.Sprint(args...))
+ l.logger.Print("WARNING: ", msg)
}
func (l *grpcLogger) Warningln(args ...interface{}) {
- l.logger.Print("WARNING: ", fmt.Sprintln(args...))
+ msg := cleanMessage(fmt.Sprintln(args...))
+ l.logger.Print("WARNING: ", msg)
}
func (l *grpcLogger) Warningf(format string, args ...interface{}) {
- l.logger.Printf("WARNING: %s", fmt.Sprintf(format, args...))
+ msg := cleanMessage(fmt.Sprintf(format, args...))
+ l.logger.Printf("WARNING: %s", msg)
}
func (l *grpcLogger) Error(args ...interface{}) {
@@ -153,6 +187,15 @@ func (l *grpcLogger) Error(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ // Filter out common connection reset errors - these are normal network
behavior
+ // when clients disconnect before completing the HTTP/2 handshake
+ if strings.Contains(msg, "connection reset by peer") ||
+ strings.Contains(msg, "failed to receive the preface from
client") ||
+ strings.Contains(msg, "connection error") {
+ // These are normal network events, log at DEBUG level instead
of ERROR
+ return
+ }
+ msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -161,6 +204,13 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ // Filter out common connection reset errors - these are normal network
behavior
+ if strings.Contains(msg, "connection reset by peer") ||
+ strings.Contains(msg, "failed to receive the preface from
client") ||
+ strings.Contains(msg, "connection error") {
+ return
+ }
+ msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -169,6 +219,13 @@ func (l *grpcLogger) Errorf(format string, args
...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ // Filter out common connection reset errors - these are normal network
behavior
+ if strings.Contains(msg, "connection reset by peer") ||
+ strings.Contains(msg, "failed to receive the preface from
client") ||
+ strings.Contains(msg, "connection error") {
+ return
+ }
+ msg = cleanMessage(msg)
l.logger.Printf("ERROR: %s", msg)
}
@@ -188,6 +245,35 @@ func (l *grpcLogger) V(level int) bool {
return level <= 0
}
+// waitForBootstrapFile waits for the grpc-bootstrap.json file to exist
+// This is necessary because the dubbo-proxy sidecar needs time to generate
the file
+func waitForBootstrapFile(bootstrapPath string, maxWait time.Duration) error {
+ log.Printf("Waiting for bootstrap file to exist: %s (max wait: %v)",
bootstrapPath, maxWait)
+
+ ctx, cancel := context.WithTimeout(context.Background(), maxWait)
+ defer cancel()
+
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
+
+ startTime := time.Now()
+ for {
+ // Check if file exists and is not empty
+ if info, err := os.Stat(bootstrapPath); err == nil &&
info.Size() > 0 {
+ log.Printf("Bootstrap file found after %v: %s",
time.Since(startTime), bootstrapPath)
+ return nil
+ }
+
+ // Check for timeout
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("timeout waiting for bootstrap file:
%s (waited %v)", bootstrapPath, time.Since(startTime))
+ case <-ticker.C:
+ // Continue waiting
+ }
+ }
+}
+
func main() {
flag.Parse()
@@ -202,6 +288,19 @@ func main() {
hostname = "unknown"
}
+ // Get bootstrap file path from environment variable or use default
+ bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
+ if bootstrapPath == "" {
+ bootstrapPath = "/etc/dubbo/proxy/grpc-bootstrap.json"
+ log.Printf("GRPC_XDS_BOOTSTRAP not set, using default: %s",
bootstrapPath)
+ }
+
+ // Wait for bootstrap file to exist before creating xDS server
+ // The dubbo-proxy sidecar needs time to generate this file
+ if err := waitForBootstrapFile(bootstrapPath, 60*time.Second); err !=
nil {
+ log.Fatalf("Failed to wait for bootstrap file: %v", err)
+ }
+
// Create xDS-enabled gRPC server
// For proxyless gRPC, we use xds.NewGRPCServer() instead of
grpc.NewServer()
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
@@ -237,7 +336,14 @@ func main() {
server.GracefulStop()
}()
+ // Serve the gRPC server
+ // Note: server.Serve returns when the listener is closed, which is
normal during shutdown
+ // Connection reset errors are handled by the gRPC library and logged
separately
if err := server.Serve(lis); err != nil {
- log.Fatalf("Failed to serve: %v", err)
+ // Only log as fatal if it's not a normal shutdown (listener
closed)
+ if !strings.Contains(err.Error(), "use of closed network
connection") {
+ log.Fatalf("Failed to serve: %v", err)
+ }
+ log.Printf("Server stopped: %v", err)
}
}
diff --git a/tests/grpc-proxyless/producer/main.go
b/tests/grpc-proxyless/producer/main.go
index acc17116..d4952140 100644
--- a/tests/grpc-proxyless/producer/main.go
+++ b/tests/grpc-proxyless/producer/main.go
@@ -26,6 +26,7 @@ import (
"net"
"os"
"os/signal"
+ "regexp"
"strings"
"sync"
"syscall"
@@ -54,11 +55,38 @@ type grpcLogger struct {
logger *log.Logger
}
+var (
+ // Regex to match gRPC formatting errors like %!p(...)
+ formatErrorRegex = regexp.MustCompile(`%!p\([^)]+\)`)
+)
+
+// cleanMessage removes formatting errors from gRPC logs
+// Fixes issues like:
"\u003c%!p(networktype.keyType=grpc.internal.transport.networktype)\u003e":
"unix"
+func cleanMessage(msg string) string {
+ // Replace %!p(...) patterns with a cleaner representation
+ msg = formatErrorRegex.ReplaceAllStringFunc(msg, func(match string)
string {
+ // Extract the key from %!p(networktype.keyType=...)
+ if strings.Contains(match, "networktype.keyType") {
+ return `"networktype"`
+ }
+ // For other cases, just remove the error pattern
+ return ""
+ })
+ // Also clean up Unicode escape sequences that appear with formatting
errors
+ // Replace \u003c (which is <) and \u003e (which is >) when they appear
with formatting errors
+ msg = strings.ReplaceAll(msg, `\u003c`, "<")
+ msg = strings.ReplaceAll(msg, `\u003e`, ">")
+ // Clean up patterns like <...>: "unix" to just show the value
+ msg = regexp.MustCompile(`<[^>]*>:\s*"unix"`).ReplaceAllString(msg,
`"networktype": "unix"`)
+ return msg
+}
+
func (l *grpcLogger) Info(args ...interface{}) {
msg := fmt.Sprint(args...)
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Print("INFO: ", msg)
}
@@ -67,6 +95,7 @@ func (l *grpcLogger) Infoln(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Print("INFO: ", msg)
}
@@ -75,19 +104,23 @@ func (l *grpcLogger) Infof(format string, args
...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Printf("INFO: %s", msg)
}
func (l *grpcLogger) Warning(args ...interface{}) {
- l.logger.Print("WARNING: ", fmt.Sprint(args...))
+ msg := cleanMessage(fmt.Sprint(args...))
+ l.logger.Print("WARNING: ", msg)
}
func (l *grpcLogger) Warningln(args ...interface{}) {
- l.logger.Print("WARNING: ", fmt.Sprintln(args...))
+ msg := cleanMessage(fmt.Sprintln(args...))
+ l.logger.Print("WARNING: ", msg)
}
func (l *grpcLogger) Warningf(format string, args ...interface{}) {
- l.logger.Printf("WARNING: %s", fmt.Sprintf(format, args...))
+ msg := cleanMessage(fmt.Sprintf(format, args...))
+ l.logger.Printf("WARNING: %s", msg)
}
func (l *grpcLogger) Error(args ...interface{}) {
@@ -95,6 +128,7 @@ func (l *grpcLogger) Error(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -103,6 +137,7 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -111,6 +146,7 @@ func (l *grpcLogger) Errorf(format string, args
...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ msg = cleanMessage(msg)
l.logger.Printf("ERROR: %s", msg)
}