This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new f8be1cae fix: grpc proxyless xds connection and test example (#809)
f8be1cae is described below

commit f8be1cae86c735c5bf26cb4903bb538e51982507
Author: Jian Zhong <[email protected]>
AuthorDate: Fri Nov 7 12:42:27 2025 +0800

    fix: grpc proxyless xds connection and test example (#809)
---
 README.md                                          |   8 +-
 pkg/dubbo-agent/xds_proxy.go                       |  51 ++-
 pkg/xds/server.go                                  |  25 ++
 sail/pkg/model/context.go                          |  17 +-
 sail/pkg/model/endpointshards.go                   |  29 +-
 sail/pkg/model/service.go                          |  32 +-
 sail/pkg/model/typed_xds_cache.go                  |  19 +-
 sail/pkg/networking/grpcgen/cds.go                 |  16 +
 sail/pkg/networking/grpcgen/grpcgen.go             |  18 +-
 sail/pkg/networking/grpcgen/lds.go                 | 329 +++++++++++++++--
 sail/pkg/networking/grpcgen/rds.go                 |  94 ++++-
 sail/pkg/networking/util/util.go                   |   3 +-
 .../serviceregistry/kube/controller/controller.go  |  88 ++++-
 .../kube/controller/endpointslice.go               | 193 +++++++++-
 sail/pkg/xds/ads.go                                |  48 ++-
 sail/pkg/xds/discovery.go                          |   9 +
 sail/pkg/xds/eds.go                                |  66 +++-
 sail/pkg/xds/endpoints/endpoint_builder.go         |  63 +++-
 sail/pkg/xds/lds.go                                |  10 +-
 sail/pkg/xds/xdsgen.go                             | 363 ++++++++++++++++++-
 samples/example/README.md                          |   1 -
 samples/grpc-proxyless/grpc-proxyless.yaml         | 107 ++++++
 test/grpc-proxyless/.dockerignore                  |   6 +
 test/grpc-proxyless/.gitignore                     |   2 +
 test/grpc-proxyless/Dockerfile.consumer            |  35 ++
 test/grpc-proxyless/Dockerfile.producer            |  35 ++
 test/grpc-proxyless/README.md                      |  12 +
 test/grpc-proxyless/consumer/main.go               | 135 +++++++
 test/grpc-proxyless/generate-proto.sh              |  12 +
 test/grpc-proxyless/go.mod                         |  29 ++
 test/grpc-proxyless/go.sum                         |  76 ++++
 test/grpc-proxyless/producer/main.go               | 401 +++++++++++++++++++++
 test/grpc-proxyless/proto/echo.proto               |  42 +++
 test/grpc-proxyless/proto/gen.sh                   |   7 +
 test/grpc-proxyless/test-commands.sh               |  49 +++
 test/grpc-proxyless/test.sh                        |  56 +++
 36 files changed, 2382 insertions(+), 104 deletions(-)

diff --git a/README.md b/README.md
index 44663ae1..fc5687a4 100644
--- a/README.md
+++ b/README.md
@@ -13,9 +13,9 @@ The main repositories of Dubbo on Kubernetes include:
 
 - **dubboctl** — The command-line management tool that provides control plane 
management, development framework scaffolding, and application deployment.
 - **dubbod** — The dubbo control plane. It is built on Istio to implement a 
proxyless service mesh and includes the following components:
-  - **Sail** - (under development): Runtime proxy configuration.
-  - **Aegis** - (under development): Certificate issuance and rotation.
-  - **Gear** - (under development): Validation, aggregation, transformation, 
and distribution of Dubbo configuration.
+  - **sail** - Runtime proxy configuration.
+  - **aegis** - Certificate issuance and rotation.
+  - **gear** - Validation, aggregation, transformation, and distribution of 
Dubbo configuration.
 - **operator**: Provides user-friendly options to operate the Dubbo proxyless 
service mesh.
 
 ## Quick Start
@@ -25,4 +25,4 @@ Please refer to [official 
website](https://cn.dubbo.apache.org/zh-cn/overview/ho
 Refer to 
[CONTRIBUTING.md](https://github.com/apache/dubbo-kubernetes/blob/master/CONTRIBUTING.md)
 
 ## License
-Apache License 2.0, see 
[LICENSE](https://github.com/apache/dubbo-kubernetes/blob/master/LICENSE).
\ No newline at end of file
+Apache License 2.0, see 
[LICENSE](https://github.com/apache/dubbo-kubernetes/blob/master/LICENSE).
diff --git a/pkg/dubbo-agent/xds_proxy.go b/pkg/dubbo-agent/xds_proxy.go
index 66917adb..7a5cd2dc 100644
--- a/pkg/dubbo-agent/xds_proxy.go
+++ b/pkg/dubbo-agent/xds_proxy.go
@@ -163,6 +163,9 @@ func (p *XdsProxy) handleUpstream(ctx context.Context, con 
*ProxyConnection, xds
                                klog.Infof("XDS proxy: connection #%d received 
first response from upstream: TypeUrl=%s, Resources=%d",
                                        con.conID, 
model.GetShortType(resp.TypeUrl), len(resp.Resources))
                                firstResponse = false
+                       } else {
+                               klog.V(3).Infof("XDS proxy: connection #%d 
received response from upstream: TypeUrl=%s, Resources=%d, VersionInfo=%s",
+                                       con.conID, 
model.GetShortType(resp.TypeUrl), len(resp.Resources), resp.VersionInfo)
                        }
                        select {
                        case con.responsesChan <- resp:
@@ -202,7 +205,7 @@ func (p *XdsProxy) handleUpstreamRequest(con 
*ProxyConnection) {
                                return
                        }
 
-                       klog.V(2).Infof("XDS proxy: connection #%d received 
request: TypeUrl=%s, Node=%v, ResourceNames=%d",
+                       klog.V(3).Infof("XDS proxy: connection #%d received 
request: TypeUrl=%s, Node=%v, ResourceNames=%d",
                                con.conID, model.GetShortType(req.TypeUrl), 
req.Node != nil, len(req.ResourceNames))
 
                        // Save Node from first request that contains it
@@ -242,10 +245,26 @@ func (p *XdsProxy) handleUpstreamRequest(con 
*ProxyConnection) {
                        }
 
                        // Ensure Node is set in request before forwarding
+                       // For proxyless gRPC, we must ensure Node is always 
set in every request
                        con.nodeMutex.RLock()
-                       if con.node != nil && req.Node == nil {
-                               req.Node = con.node
-                               klog.V(2).Infof("XDS proxy: connection #%d 
added saved Node to request", con.conID)
+                       if con.node != nil {
+                               if req.Node == nil {
+                                       // Deep copy Node to avoid race 
conditions
+                                       req.Node = &core.Node{
+                                               Id:       con.node.Id,
+                                               Cluster:  con.node.Cluster,
+                                               Locality: con.node.Locality,
+                                               Metadata: con.node.Metadata,
+                                       }
+                                       klog.V(2).Infof("XDS proxy: connection 
#%d added saved Node to request", con.conID)
+                               } else if req.Node.Id == "" {
+                                       // If Node exists but Id is empty, copy 
from saved node
+                                       req.Node.Id = con.node.Id
+                                       if req.Node.Metadata == nil {
+                                               req.Node.Metadata = 
con.node.Metadata
+                                       }
+                                       klog.V(2).Infof("XDS proxy: connection 
#%d filled empty Node.Id in request", con.conID)
+                               }
                        }
                        con.nodeMutex.RUnlock()
 
@@ -314,9 +333,24 @@ func (p *XdsProxy) handleUpstreamRequest(con 
*ProxyConnection) {
                        }
 
                        // Ensure Node is set before sending to upstream
+                       // For proxyless gRPC, we must ensure Node is always 
set in every request
                        con.nodeMutex.RLock()
-                       if con.node != nil && req.Node == nil {
-                               req.Node = con.node
+                       if con.node != nil {
+                               if req.Node == nil {
+                                       // Deep copy Node to avoid race 
conditions
+                                       req.Node = &core.Node{
+                                               Id:       con.node.Id,
+                                               Cluster:  con.node.Cluster,
+                                               Locality: con.node.Locality,
+                                               Metadata: con.node.Metadata,
+                                       }
+                               } else if req.Node.Id == "" {
+                                       // If Node exists but Id is empty, copy 
from saved node
+                                       req.Node.Id = con.node.Id
+                                       if req.Node.Metadata == nil {
+                                               req.Node.Metadata = 
con.node.Metadata
+                                       }
+                               }
                        }
                        con.nodeMutex.RUnlock()
 
@@ -412,12 +446,15 @@ func (p *XdsProxy) handleUpstreamResponse(con 
*ProxyConnection) {
                        }
 
                        // Forward all non-internal responses to downstream 
(gRPC client)
-                       // This is critical for proxyless gRPC clients to 
receive XDS configuration
+                       klog.V(3).Infof("XDS proxy: connection #%d forwarding 
response to downstream: TypeUrl=%s, Resources=%d, VersionInfo=%s",
+                               con.conID, model.GetShortType(resp.TypeUrl), 
len(resp.Resources), resp.VersionInfo)
                        if err := con.downstream.Send(resp); err != nil {
                                klog.Errorf("XDS proxy: connection #%d failed 
to send response to downstream: %v", con.conID, err)
                                downstreamErr(con, err)
                                return
                        }
+                       klog.V(3).Infof("XDS proxy: connection #%d successfully 
forwarded response to downstream: TypeUrl=%s, Resources=%d",
+                               con.conID, model.GetShortType(resp.TypeUrl), 
len(resp.Resources))
 
                        // Send ACK for normal XDS responses (if not already 
sent by handler)
                        if !hasHandler {
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 9446d8aa..b3282211 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -322,6 +322,31 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
        removed := previousResources.Difference(cur)
        added := cur.Difference(previousResources)
 
+       // CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty 
ResourceNames) after receiving specific resources,
+       // this is likely an ACK and we should NOT push all resources again
+       // Check if this is a wildcard request after specific resources were 
sent
+       if len(request.ResourceNames) == 0 && len(previousResources) > 0 && 
previousInfo.NonceSent != "" {
+               // This is a wildcard request after specific resources were sent
+               // For proxyless gRPC clients, this should be treated as ACK, 
not a request for all resources
+               // The client already has the resources from the previous push
+               // Check if this is proxyless by attempting to get the proxy 
from watcher
+               // For now, we'll check if this is a proxyless scenario by the 
context
+               // If previous resources existed and client sends empty names 
with matching nonce, it's an ACK
+               klog.V(2).Infof("ADS:%s: wildcard request after specific 
resources (prev: %d resources, nonce: %s), treating as ACK",
+                       stype, len(previousResources), previousInfo.NonceSent)
+               // Update ResourceNames to keep previous resources (don't clear 
them)
+               w.UpdateWatchedResource(request.TypeUrl, func(wr 
*WatchedResource) *WatchedResource {
+                       if wr == nil {
+                               return nil
+                       }
+                       // Keep the previous ResourceNames, don't clear them 
with empty set
+                       // The client is ACKing the previous push, not 
requesting all resources
+                       wr.ResourceNames = previousResources
+                       return wr
+               })
+               return false, emptyResourceDelta
+       }
+
        // We should always respond "alwaysRespond" marked requests to let 
Envoy finish warming
        // even though Nonce match and it looks like an ACK.
        if alwaysRespond {
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index c907ea78..38e255c5 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -326,7 +326,7 @@ func ParseServiceNodeWithMetadata(nodeID string, metadata 
*NodeMetadata) (*Proxy
        }
 
        if len(parts) != 4 {
-               return out, fmt.Errorf("missing parts in the service node %q", 
nodeID)
+               return out, fmt.Errorf("missing parts in the service node %q 
(expected 4 parts, got %d)", nodeID, len(parts))
        }
 
        // Extract IP address from parts[1] (format: type~ip~id~domain)
@@ -338,13 +338,24 @@ func ParseServiceNodeWithMetadata(nodeID string, metadata 
*NodeMetadata) (*Proxy
                }
        }
 
-       // Does query from ingress or router have to carry valid IP address?
+       // If IP address is empty in node ID, we still need to validate it
+       // For proxyless gRPC, IP address should be set, but we'll allow it to 
be set later
+       // if it's empty, we'll set it from pod IP when ServiceTargets are 
computed
        if len(out.IPAddresses) == 0 {
-               return out, fmt.Errorf("no valid IP address in the service node 
id or metadata")
+               // IP address will be set later when ServiceTargets are 
computed from pod IP
+               // For now, we allow empty IP for proxyless nodes to avoid 
failing initialization
+               // The IP will be populated when GetProxyServiceTargets is 
called
+               out.IPAddresses = []string{}
        }
 
        out.ID = parts[2]
        out.DNSDomain = parts[3]
+
+       // Validate that ID is not empty - this is critical for proxyless gRPC
+       if len(out.ID) == 0 {
+               return out, fmt.Errorf("node ID is empty in service node %q 
(parts[2] is empty)", nodeID)
+       }
+
        return out, nil
 }
 
diff --git a/sail/pkg/model/endpointshards.go b/sail/pkg/model/endpointshards.go
index 81808566..1e57f0e3 100644
--- a/sail/pkg/model/endpointshards.go
+++ b/sail/pkg/model/endpointshards.go
@@ -1,9 +1,10 @@
 package model
 
 import (
-       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "sync"
 
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+
        "github.com/apache/dubbo-kubernetes/pkg/cluster"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
@@ -160,8 +161,32 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
        oldDubboEndpoints := ep.Shards[shard]
        newDubboEndpoints, needPush := 
endpointUpdateRequiresPush(oldDubboEndpoints, dubboEndpoints)
 
+       // CRITICAL: Log endpoint update details for debugging
+       if logPushType {
+               oldHealthyCount := 0
+               oldUnhealthyCount := 0
+               newHealthyCount := 0
+               newUnhealthyCount := 0
+               for _, ep := range oldDubboEndpoints {
+                       if ep.HealthStatus == Healthy {
+                               oldHealthyCount++
+                       } else {
+                               oldUnhealthyCount++
+                       }
+               }
+               for _, ep := range newDubboEndpoints {
+                       if ep.HealthStatus == Healthy {
+                               newHealthyCount++
+                       } else {
+                               newUnhealthyCount++
+                       }
+               }
+               klog.Warningf("UpdateServiceEndpoints: service=%s, shard=%v, 
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d, 
unhealthy=%d), needPush=%v, pushType=%v",
+                       hostname, shard, len(oldDubboEndpoints), 
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount, 
newUnhealthyCount, needPush, pushType)
+       }
+
        if pushType != FullPush && !needPush {
-               klog.V(2).Infof("No push, either old endpoint health status did 
not change or new endpoint came with unhealthy status, %v", hostname)
+               klog.Warningf("No push, either old endpoint health status did 
not change or new endpoint came with unhealthy status, %v (oldEndpoints=%d, 
newEndpoints=%d)", hostname, len(oldDubboEndpoints), len(newDubboEndpoints))
                pushType = NoPush
        }
 
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index c0527192..9a4ab916 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -155,6 +155,17 @@ func (ep *DubboEndpoint) Equals(other *DubboEndpoint) bool 
{
                return false
        }
 
+       // CRITICAL FIX: Compare HealthStatus to detect health status changes
+       // This is necessary to trigger EDS push when endpoints become 
healthy/unhealthy
+       if ep.HealthStatus != other.HealthStatus {
+               return false
+       }
+
+       // CRITICAL FIX: Compare EndpointPort to detect port changes
+       if ep.EndpointPort != other.EndpointPort {
+               return false
+       }
+
        return true
 }
 
@@ -470,13 +481,32 @@ func (s *ServiceAttributes) Equals(other 
*ServiceAttributes) bool {
 }
 
 func (s *Service) SupportsUnhealthyEndpoints() bool {
-       return false
+       // CRITICAL FIX: Return PublishNotReadyAddresses to support publishing 
not-ready endpoints
+       // This allows endpoints with Ready=false to be included in EDS if the 
service has
+       // publishNotReadyAddresses=true, which is useful for services that 
need to receive
+       // traffic even before they are fully ready (e.g., during startup).
+       // CRITICAL FIX: Check if s is nil before accessing Attributes
+       if s == nil {
+               return false
+       }
+       return s.Attributes.PublishNotReadyAddresses
 }
 
 func (s *Service) SupportsDrainingEndpoints() bool {
        return false
 }
 
+// GetAddressForProxy returns the primary address for a service from the 
proxy's perspective.
+// This is used for outbound listener addresses in gRPC proxyless mode.
+func (s *Service) GetAddressForProxy(node *Proxy) string {
+       addresses := s.getAllAddressesForProxy(node)
+       if len(addresses) > 0 {
+               return addresses[0]
+       }
+       // Default to 0.0.0.0 if no address found, which matches outbound 
listener behavior
+       return "0.0.0.0"
+}
+
 func (s *Service) GetExtraAddressesForProxy(node *Proxy) []string {
        addresses := s.getAllAddressesForProxy(node)
        if len(addresses) > 1 {
diff --git a/sail/pkg/model/typed_xds_cache.go 
b/sail/pkg/model/typed_xds_cache.go
index af1aec08..883990a7 100644
--- a/sail/pkg/model/typed_xds_cache.go
+++ b/sail/pkg/model/typed_xds_cache.go
@@ -2,6 +2,9 @@ package model
 
 import (
        "fmt"
+       "sync"
+       "time"
+
        "github.com/apache/dubbo-kubernetes/pkg/slices"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/features"
@@ -9,8 +12,7 @@ import (
        "github.com/google/go-cmp/cmp"
        "github.com/hashicorp/golang-lru/v2/simplelru"
        "google.golang.org/protobuf/testing/protocmp"
-       "sync"
-       "time"
+       "k8s.io/klog/v2"
 )
 
 type CacheToken uint64
@@ -186,13 +188,20 @@ func (l *lruCache[K]) Clear(configs sets.Set[ConfigKey]) {
        defer func() {
                l.evictedOnClear = false
        }()
+       clearedCount := 0
        for ckey := range configs {
                hc := ckey.HashCode()
                referenced := l.configIndex[hc]
-               delete(l.configIndex, hc)
-               for key := range referenced {
-                       l.store.Remove(key)
+               if len(referenced) > 0 {
+                       clearedCount += len(referenced)
+                       for key := range referenced {
+                               l.store.Remove(key)
+                       }
                }
+               delete(l.configIndex, hc)
+       }
+       if clearedCount > 0 {
+               klog.V(3).Infof("lruCache.Clear: cleared %d cache entries for 
%d configs", clearedCount, len(configs))
        }
 }
 
diff --git a/sail/pkg/networking/grpcgen/cds.go 
b/sail/pkg/networking/grpcgen/cds.go
index 5b085654..c5d87142 100644
--- a/sail/pkg/networking/grpcgen/cds.go
+++ b/sail/pkg/networking/grpcgen/cds.go
@@ -2,6 +2,7 @@ package grpcgen
 
 import (
        "fmt"
+
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
@@ -98,6 +99,11 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
        var defaultCluster *cluster.Cluster
        if b.filter.Contains(b.defaultClusterName) {
                defaultCluster = b.edsCluster(b.defaultClusterName)
+               // CRITICAL: For gRPC proxyless, we need to set CommonLbConfig 
to handle endpoint health status
+               // This ensures that the cluster can use healthy endpoints for 
load balancing
+               if defaultCluster.CommonLbConfig == nil {
+                       defaultCluster.CommonLbConfig = 
&cluster.Cluster_CommonLbConfig{}
+               }
                if b.svc.SupportsDrainingEndpoints() {
                        // see core/v1alpha3/cluster.go
                        defaultCluster.CommonLbConfig.OverrideHostStatus = 
&core.HealthStatusSet{
@@ -106,6 +112,13 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
                                        core.HealthStatus_DRAINING, 
core.HealthStatus_UNKNOWN, core.HealthStatus_DEGRADED,
                                },
                        }
+               } else {
+                       // For gRPC proxyless, only use HEALTHY endpoints by 
default
+                       defaultCluster.CommonLbConfig.OverrideHostStatus = 
&core.HealthStatusSet{
+                               Statuses: []core.HealthStatus{
+                                       core.HealthStatus_HEALTHY,
+                               },
+                       }
                }
        }
 
@@ -130,6 +143,9 @@ func (b *clusterBuilder) edsCluster(name string) 
*cluster.Cluster {
                                },
                        },
                },
+               // CRITICAL: For gRPC proxyless, we need to set LbPolicy to 
ROUND_ROBIN
+               // This is the default load balancing policy for gRPC xDS 
clients
+               LbPolicy: cluster.Cluster_ROUND_ROBIN,
        }
 }
 
diff --git a/sail/pkg/networking/grpcgen/grpcgen.go 
b/sail/pkg/networking/grpcgen/grpcgen.go
index 804ed651..20d46e1f 100644
--- a/sail/pkg/networking/grpcgen/grpcgen.go
+++ b/sail/pkg/networking/grpcgen/grpcgen.go
@@ -8,13 +8,25 @@ import (
 type GrpcConfigGenerator struct{}
 
 func (g *GrpcConfigGenerator) Generate(proxy *model.Proxy, w 
*model.WatchedResource, req *model.PushRequest) (model.Resources, 
model.XdsLogDetails, error) {
+       // Extract requested resource names from WatchedResource
+       // If ResourceNames is empty (wildcard request), pass empty slice
+       // BuildListeners will handle empty names as wildcard request and 
generate all listeners
+       var requestedNames []string
+       if w != nil && w.ResourceNames != nil && len(w.ResourceNames) > 0 {
+               requestedNames = w.ResourceNames.UnsortedList()
+       }
+
        switch w.TypeUrl {
        case v3.ListenerType:
-               return g.BuildListeners(proxy, req.Push, 
w.ResourceNames.UnsortedList()), model.DefaultXdsLogDetails, nil
+               // Pass requested names to BuildListeners to ensure consistent 
behavior
+               // This is critical for gRPC proxyless clients to avoid 
resource count oscillation
+               // When requestedNames is empty (wildcard), BuildListeners 
generates all listeners
+               // When requestedNames is non-empty, BuildListeners only 
generates requested listeners
+               return g.BuildListeners(proxy, req.Push, requestedNames), 
model.DefaultXdsLogDetails, nil
        case v3.ClusterType:
-               return g.BuildClusters(proxy, req.Push, 
w.ResourceNames.UnsortedList()), model.DefaultXdsLogDetails, nil
+               return g.BuildClusters(proxy, req.Push, requestedNames), 
model.DefaultXdsLogDetails, nil
        case v3.RouteType:
-               return g.BuildHTTPRoutes(proxy, req.Push, 
w.ResourceNames.UnsortedList()), model.DefaultXdsLogDetails, nil
+               return g.BuildHTTPRoutes(proxy, req.Push, requestedNames), 
model.DefaultXdsLogDetails, nil
        }
 
        return nil, model.DefaultXdsLogDetails, nil
diff --git a/sail/pkg/networking/grpcgen/lds.go 
b/sail/pkg/networking/grpcgen/lds.go
index 810bb662..463e0a74 100644
--- a/sail/pkg/networking/grpcgen/lds.go
+++ b/sail/pkg/networking/grpcgen/lds.go
@@ -6,13 +6,18 @@ import (
        "strconv"
        "strings"
 
+       "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/pkg/wellknown"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
        "github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        listener 
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+       route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+       routerv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
+       hcmv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
        "k8s.io/klog/v2"
 )
@@ -70,9 +75,11 @@ func newListenerNameFilter(names []string, node 
*model.Proxy) listenerNames {
 }
 
 func (g *GrpcConfigGenerator) BuildListeners(node *model.Proxy, push 
*model.PushContext, names []string) model.Resources {
-       // If no specific names requested, generate listeners for all 
ServiceTargets
+       // For LDS (wildcard type), empty ResourceNames means request all 
listeners
+       // If names is empty, generate listeners for all ServiceTargets to 
ensure consistent behavior
+       // This prevents the client from receiving different numbers of 
listeners on different requests
        if len(names) == 0 && len(node.ServiceTargets) > 0 {
-               // Build listener names from ServiceTargets for initial request
+               // Build listener names from ServiceTargets for 
wildcard/initial request
                names = make([]string, 0, len(node.ServiceTargets))
                for _, st := range node.ServiceTargets {
                        if st.Service != nil && st.Port.ServicePort != nil {
@@ -81,19 +88,242 @@ func (g *GrpcConfigGenerator) BuildListeners(node 
*model.Proxy, push *model.Push
                                names = append(names, listenerName)
                        }
                }
+               klog.V(2).Infof("BuildListeners: wildcard request for %s, 
generating %d listeners from ServiceTargets: %v", node.ID, len(names), names)
+       } else if len(names) > 0 {
+               klog.V(2).Infof("BuildListeners: specific request for %s, 
requested listeners: %v", node.ID, names)
        }
 
+       // CRITICAL: If names is provided (non-empty), we MUST only generate 
those specific listeners
+       // This prevents the push loop where client requests 1 listener but 
receives 14
+       // The filter ensures we only generate requested listeners
        filter := newListenerNameFilter(names, node)
        resp := make(model.Resources, 0, len(filter))
-       resp = append(resp, buildOutboundListeners(node, push, filter)...)
+
+       // Build outbound listeners first (they may reference clusters that 
need CDS)
+       outboundRes := buildOutboundListeners(node, push, filter)
+       resp = append(resp, outboundRes...)
        resp = append(resp, buildInboundListeners(node, push, 
filter.inboundNames())...)
 
+       // Final safety check: ensure we only return resources that were 
requested
+       // This is critical for preventing push loops in proxyless gRPC
+       if len(names) > 0 {
+               requestedSet := sets.New(names...)
+               filtered := make(model.Resources, 0, len(resp))
+               for _, r := range resp {
+                       if requestedSet.Contains(r.Name) {
+                               filtered = append(filtered, r)
+                       } else {
+                               klog.V(2).Infof("BuildListeners: filtering out 
unrequested listener %s (requested: %v)", r.Name, names)
+                       }
+               }
+               return filtered
+       }
+
        return resp
 }
 
 func buildOutboundListeners(node *model.Proxy, push *model.PushContext, filter 
listenerNames) model.Resources {
        out := make(model.Resources, 0, len(filter))
-       // TODO SidecarScope?
+
+       // Extract outbound listener names (not inbound)
+       // The filter map uses hostname as key, and stores ports in 
listenerName.Ports
+       // We need to reconstruct the full listener name (hostname:port) for 
each port
+       type listenerInfo struct {
+               hostname string
+               ports    []string
+       }
+       var outboundListeners []listenerInfo
+
+       for name, ln := range filter {
+               if strings.HasPrefix(name, grpcxds.ServerListenerNamePrefix) {
+                       continue // Skip inbound listeners
+               }
+
+               // If this entry has ports, use them; otherwise use the name 
as-is (might already have port)
+               if len(ln.Ports) > 0 {
+                       ports := make([]string, 0, len(ln.Ports))
+                       for port := range ln.Ports {
+                               ports = append(ports, port)
+                       }
+                       outboundListeners = append(outboundListeners, 
listenerInfo{
+                               hostname: name,
+                               ports:    ports,
+                       })
+               } else {
+                       // No ports in filter, try to parse name as 
hostname:port
+                       _, _, err := net.SplitHostPort(name)
+                       if err == nil {
+                               // Name already contains port, use it directly
+                               outboundListeners = append(outboundListeners, 
listenerInfo{
+                                       hostname: name,
+                                       ports:    []string{name}, // Use full 
name as-is
+                               })
+                       } else {
+                               // No port, skip (shouldn't happen for outbound 
listeners)
+                               klog.V(2).Infof("buildOutboundListeners: 
skipping listener %s (no port information)", name)
+                       }
+               }
+       }
+
+       if len(outboundListeners) == 0 {
+               return out
+       }
+
+       // Build outbound listeners for each requested service
+       for _, li := range outboundListeners {
+               // For each port, build a listener
+               for _, portInfo := range li.ports {
+                       var hostStr, portStr string
+                       var err error
+
+                       // Check if portInfo is already "hostname:port" format
+                       hostStr, portStr, err = net.SplitHostPort(portInfo)
+                       if err != nil {
+                               // portInfo is just the port number, use 
hostname from filter key
+                               hostStr = li.hostname
+                               portStr = portInfo
+                       }
+
+                       port, err := strconv.Atoi(portStr)
+                       if err != nil {
+                               klog.Warningf("buildOutboundListeners: failed 
to parse port from %s: %v", portStr, err)
+                               continue
+                       }
+
+                       // Reconstruct full listener name for logging and final 
check
+                       fullListenerName := fmt.Sprintf("%s:%s", hostStr, 
portStr)
+
+                       // Find service in PushContext
+                       // Try different hostname formats: FQDN, short name, 
etc.
+                       hostname := host.Name(hostStr)
+                       svc := push.ServiceForHostname(node, hostname)
+
+                       // Extract short name from FQDN for fallback lookup
+                       parts := strings.Split(hostStr, ".")
+                       shortName := ""
+                       if len(parts) > 0 {
+                               shortName = parts[0]
+                       }
+
+                       // If not found with FQDN, try short name (e.g., 
"consumer" from "consumer.grpc-proxyless.svc.cluster.local")
+                       if svc == nil && shortName != "" {
+                               svc = push.ServiceForHostname(node, 
host.Name(shortName))
+                               if svc != nil {
+                                       
klog.V(2).Infof("buildOutboundListeners: found service %s using short name %s", 
svc.Hostname, shortName)
+                               }
+                       }
+
+                       // If still not found, try to find service by iterating 
all services
+                       if svc == nil {
+                               // Try to find service by matching hostname in 
all namespaces
+                               allServices := push.GetAllServices()
+                               for _, s := range allServices {
+                                       if s.Hostname == hostname || (shortName 
!= "" && strings.HasPrefix(string(s.Hostname), shortName+".")) {
+                                               svc = s
+                                               
klog.V(2).Infof("buildOutboundListeners: found service %s/%s by iterating", 
svc.Attributes.Namespace, svc.Hostname)
+                                               break
+                                       }
+                               }
+                       }
+
+                       if svc == nil {
+                               klog.Warningf("buildOutboundListeners: service 
not found for hostname %s (tried FQDN and short name)", hostStr)
+                               continue
+                       }
+
+                       // Verify service has the requested port (Service port, 
not targetPort)
+                       hasPort := false
+                       var matchedPort *model.Port
+                       for _, p := range svc.Ports {
+                               if p.Port == port {
+                                       hasPort = true
+                                       matchedPort = p
+                                       break
+                               }
+                       }
+                       if !hasPort {
+                               klog.Warningf("buildOutboundListeners: port %d 
not found in service %s (available ports: %v)",
+                                       port, hostStr, func() []int {
+                                               ports := make([]int, 0, 
len(svc.Ports))
+                                               for _, p := range svc.Ports {
+                                                       ports = append(ports, 
p.Port)
+                                               }
+                                               return ports
+                                       }())
+                               continue
+                       }
+
+                       klog.V(2).Infof("buildOutboundListeners: building 
outbound listener for %s:%d (service: %s/%s, port: %s)",
+                               hostStr, port, svc.Attributes.Namespace, 
svc.Attributes.Name, matchedPort.Name)
+
+                       // Build cluster name using BuildSubsetKey to ensure 
correct format
+                       // Format: outbound|port||hostname (e.g., 
outbound|7070||consumer.grpc-proxyless.svc.cluster.local)
+                       // Use svc.Hostname (FQDN) instead of 
svc.Attributes.Name (short name) to match CDS expectations
+                       clusterName := 
model.BuildSubsetKey(model.TrafficDirectionOutbound, "", svc.Hostname, port)
+
+                       // Build route name (same format as cluster name) for 
RDS
+                       routeName := clusterName
+
+                       // CRITICAL: For gRPC proxyless, outbound listeners 
MUST use ApiListener with RDS
+                       // This is the correct pattern used by Istio for gRPC 
xDS clients
+                       // Using FilterChain with inline RouteConfig causes the 
gRPC client to remain in IDLE state
+                       hcm := &hcmv3.HttpConnectionManager{
+                               CodecType:  hcmv3.HttpConnectionManager_AUTO,
+                               StatPrefix: fmt.Sprintf("outbound_%d_%s", port, 
svc.Attributes.Name),
+                               RouteSpecifier: 
&hcmv3.HttpConnectionManager_Rds{
+                                       Rds: &hcmv3.Rds{
+                                               ConfigSource: 
&core.ConfigSource{
+                                                       ConfigSourceSpecifier: 
&core.ConfigSource_Ads{
+                                                               Ads: 
&core.AggregatedConfigSource{},
+                                                       },
+                                               },
+                                               RouteConfigName: routeName,
+                                       },
+                               },
+                               HttpFilters: []*hcmv3.HttpFilter{
+                                       {
+                                               Name: 
"envoy.filters.http.router",
+                                               ConfigType: 
&hcmv3.HttpFilter_TypedConfig{
+                                                       TypedConfig: 
protoconv.MessageToAny(&routerv3.Router{}),
+                                               },
+                                       },
+                               },
+                       }
+
+                       // Build outbound listener with ApiListener (Istio 
pattern)
+                       // CRITICAL: gRPC xDS clients expect ApiListener for 
outbound, not FilterChain
+                       ll := &listener.Listener{
+                               Name: fullListenerName,
+                               Address: &core.Address{Address: 
&core.Address_SocketAddress{
+                                       SocketAddress: &core.SocketAddress{
+                                               Address: 
svc.GetAddressForProxy(node), // Use service VIP
+                                               PortSpecifier: 
&core.SocketAddress_PortValue{
+                                                       PortValue: uint32(port),
+                                               },
+                                       },
+                               }},
+                               ApiListener: &listener.ApiListener{
+                                       ApiListener: 
protoconv.MessageToAny(hcm),
+                               },
+                       }
+
+                       // Add extra addresses if available
+                       extrAddresses := svc.GetExtraAddressesForProxy(node)
+                       if len(extrAddresses) > 0 {
+                               ll.AdditionalAddresses = 
util.BuildAdditionalAddresses(extrAddresses, uint32(port))
+                       }
+
+                       // CRITICAL: Log listener details for debugging gRPC 
xDS client connection issues
+                       klog.V(3).Infof("buildOutboundListeners: created 
ApiListener name=%s, address=%s:%d, routeName=%s",
+                               ll.Name, ll.Address.GetSocketAddress().Address, 
ll.Address.GetSocketAddress().GetPortValue(), routeName)
+
+                       out = append(out, &discovery.Resource{
+                               Name:     ll.Name,
+                               Resource: protoconv.MessageToAny(ll),
+                       })
+               }
+       }
+
        return out
 }
 
@@ -117,14 +347,13 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
                }
        }
 
-       // If names are provided, use them; otherwise, generate listeners for 
all ServiceTargets
+       // Use the provided names - at this point names should not be empty
+       // (empty names are handled in BuildListeners to generate all listeners)
+       // This ensures we only generate listeners that were requested, 
preventing inconsistent behavior
        listenerNames := names
        if len(listenerNames) == 0 {
-               // Generate listener names from ServiceTargets
-               for port := range serviceInstancesByPort {
-                       listenerName := fmt.Sprintf("%s0.0.0.0:%d", 
grpcxds.ServerListenerNamePrefix, port)
-                       listenerNames = append(listenerNames, listenerName)
-               }
+               // This should not happen if BuildListeners logic is correct, 
but handle gracefully
+               return out
        }
 
        for _, name := range listenerNames {
@@ -141,28 +370,54 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
                }
                si, ok := serviceInstancesByPort[uint32(listenPort)]
                if !ok {
-                       // If no service target found for this port, still 
create a minimal listener
-                       // This ensures we respond with at least one listener 
for connection initialization
-                       klog.V(2).Infof("%s has no service instance for port 
%s, creating minimal listener", node.ID, listenPortStr)
-                       ll := &listener.Listener{
-                               Name: name,
-                               Address: &core.Address{Address: 
&core.Address_SocketAddress{
-                                       SocketAddress: &core.SocketAddress{
-                                               Address: listenHost,
-                                               PortSpecifier: 
&core.SocketAddress_PortValue{
-                                                       PortValue: 
uint32(listenPort),
+                       // If no service target found for this port, don't 
create a listener
+                       // This prevents creating invalid listeners that would 
cause SERVING/NOT_SERVING cycles
+                       // The client should only request listeners for ports 
that are actually exposed by the service
+                       klog.Warningf("%s has no service instance for port %s, 
skipping listener %s. "+
+                               "This usually means the requested port doesn't 
match any service port in the pod's ServiceTargets.",
+                               node.ID, listenPortStr, name)
+                       continue
+               }
+
+               // For proxyless gRPC inbound listeners, we need a FilterChain 
with HttpConnectionManager filter
+               // to satisfy gRPC client requirements. According to grpc-go 
issue #7691 and the error
+               // "missing HttpConnectionManager filter", gRPC proxyless 
clients require HttpConnectionManager
+               // in the FilterChain for inbound listeners.
+               // Use inline RouteConfig instead of RDS to avoid triggering 
additional RDS requests that cause push loops
+               // For proxyless gRPC, inline configuration is preferred to 
minimize round-trips
+               routeName := fmt.Sprintf("%d", listenPort)
+               hcm := &hcmv3.HttpConnectionManager{
+                       CodecType:  hcmv3.HttpConnectionManager_AUTO,
+                       StatPrefix: fmt.Sprintf("inbound_%d", listenPort),
+                       RouteSpecifier: 
&hcmv3.HttpConnectionManager_RouteConfig{
+                               RouteConfig: &route.RouteConfiguration{
+                                       Name: routeName,
+                                       VirtualHosts: []*route.VirtualHost{
+                                               {
+                                                       Name:    
"inbound|http|" + routeName,
+                                                       Domains: []string{"*"},
+                                                       Routes: []*route.Route{
+                                                               {
+                                                                       Match: 
&route.RouteMatch{
+                                                                               
PathSpecifier: &route.RouteMatch_Prefix{
+                                                                               
        Prefix: "/",
+                                                                               
},
+                                                                       },
+                                                                       Action: 
&route.Route_NonForwardingAction{},
+                                                               },
+                                                       },
                                                },
                                        },
-                               }},
-                               FilterChains:    nil,
-                               ListenerFilters: nil,
-                               UseOriginalDst:  nil,
-                       }
-                       out = append(out, &discovery.Resource{
-                               Name:     ll.Name,
-                               Resource: protoconv.MessageToAny(ll),
-                       })
-                       continue
+                               },
+                       },
+                       HttpFilters: []*hcmv3.HttpFilter{
+                               {
+                                       Name: "envoy.filters.http.router",
+                                       ConfigType: 
&hcmv3.HttpFilter_TypedConfig{
+                                               TypedConfig: 
protoconv.MessageToAny(&routerv3.Router{}),
+                                       },
+                               },
+                       },
                }
 
                ll := &listener.Listener{
@@ -175,7 +430,19 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
                                        },
                                },
                        }},
-                       FilterChains: nil,
+                       // Create FilterChain with HttpConnectionManager filter 
for proxyless gRPC
+                       FilterChains: []*listener.FilterChain{
+                               {
+                                       Filters: []*listener.Filter{
+                                               {
+                                                       Name: 
wellknown.HTTPConnectionManager,
+                                                       ConfigType: 
&listener.Filter_TypedConfig{
+                                                               TypedConfig: 
protoconv.MessageToAny(hcm),
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
                        // the following must not be set or the client will NACK
                        ListenerFilters: nil,
                        UseOriginalDst:  nil,
diff --git a/sail/pkg/networking/grpcgen/rds.go 
b/sail/pkg/networking/grpcgen/rds.go
index dae825b4..4d5d770f 100644
--- a/sail/pkg/networking/grpcgen/rds.go
+++ b/sail/pkg/networking/grpcgen/rds.go
@@ -1,6 +1,10 @@
 package grpcgen
 
 import (
+       "fmt"
+       "strconv"
+       "strings"
+
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
        route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
@@ -22,19 +26,87 @@ func (g *GrpcConfigGenerator) BuildHTTPRoutes(node 
*model.Proxy, push *model.Pus
 }
 
 func buildHTTPRoute(node *model.Proxy, push *model.PushContext, routeName 
string) *route.RouteConfiguration {
-       // TODO use route-style naming instead of cluster naming
-       _, _, hostname, port := model.ParseSubsetKey(routeName)
-       if hostname == "" || port == 0 {
-               klog.Warningf("failed to parse %v", routeName)
-               return nil
-       }
+       // For proxyless gRPC inbound routes, routeName is just the port number 
(e.g., "17070")
+       // For outbound routes, routeName is cluster format 
(outbound|port||hostname)
+       _, err := strconv.Atoi(routeName)
+       if err != nil {
+               // Try to parse as cluster naming format 
(outbound|port||hostname)
+               _, _, hostname, parsedPort := model.ParseSubsetKey(routeName)
+               if hostname == "" || parsedPort == 0 {
+                       klog.Warningf("failed to parse route name %v", 
routeName)
+                       return nil
+               }
+
+               // Build outbound route configuration for gRPC proxyless
+               // This is used by ApiListener to route traffic to the correct 
cluster
+               svc := push.ServiceForHostname(node, hostname)
+               if svc == nil {
+                       klog.Warningf("buildHTTPRoute: service not found for 
hostname %s", hostname)
+                       return nil
+               }
 
-       // virtualHosts, _, _ := core.BuildSidecarOutboundVirtualHosts(node, 
push, routeName, port, nil, &model.DisabledCache{})
+               // Build VirtualHost Domains for outbound route
+               // CRITICAL: Domains must match the xDS URL hostname for gRPC 
xDS client
+               hostStr := string(hostname)
+               domains := []string{
+                       fmt.Sprintf("%s:%d", hostStr, parsedPort), // FQDN with 
port - MOST SPECIFIC
+                       hostStr, // Full FQDN
+               }
+               // Add short name if different from FQDN
+               hostParts := strings.Split(hostStr, ".")
+               if len(hostParts) > 0 && hostParts[0] != hostStr {
+                       shortName := hostParts[0]
+                       domains = append(domains, fmt.Sprintf("%s:%d", 
shortName, parsedPort)) // Short name with port
+                       domains = append(domains, shortName)                    
               // Short name
+               }
+               domains = append(domains, "*") // Wildcard for any domain - 
LEAST SPECIFIC
+
+               return &route.RouteConfiguration{
+                       Name: routeName,
+                       VirtualHosts: []*route.VirtualHost{
+                               {
+                                       Name:    fmt.Sprintf("%s|http|%d", 
hostStr, parsedPort),
+                                       Domains: domains,
+                                       Routes: []*route.Route{
+                                               {
+                                                       Match: 
&route.RouteMatch{
+                                                               PathSpecifier: 
&route.RouteMatch_Prefix{
+                                                                       Prefix: 
"/",
+                                                               },
+                                                       },
+                                                       Action: 
&route.Route_Route{
+                                                               Route: 
&route.RouteAction{
+                                                                       
ClusterSpecifier: &route.RouteAction_Cluster{
+                                                                               
Cluster: routeName, // Use routeName (cluster name)
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+               }
+       }
 
-       // Only generate the required route for grpc. Will need to generate more
-       // as GRPC adds more features.
+       // Build minimal route configuration for proxyless gRPC inbound listener
+       // NonForwardingAction indicates this is an inbound listener that 
should handle requests directly
        return &route.RouteConfiguration{
-               Name:         routeName,
-               VirtualHosts: nil,
+               Name: routeName,
+               VirtualHosts: []*route.VirtualHost{
+                       {
+                               Name:    "inbound|http|" + routeName,
+                               Domains: []string{"*"},
+                               Routes: []*route.Route{
+                                       {
+                                               Match: &route.RouteMatch{
+                                                       PathSpecifier: 
&route.RouteMatch_Prefix{
+                                                               Prefix: "/",
+                                                       },
+                                               },
+                                               Action: 
&route.Route_NonForwardingAction{},
+                                       },
+                               },
+                       },
+               },
        }
 }
diff --git a/sail/pkg/networking/util/util.go b/sail/pkg/networking/util/util.go
index 3da87664..d2061faa 100644
--- a/sail/pkg/networking/util/util.go
+++ b/sail/pkg/networking/util/util.go
@@ -2,12 +2,13 @@ package util
 
 import (
        "fmt"
+       "strings"
+
        "github.com/apache/dubbo-kubernetes/pkg/config/constants"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        dubbonetworking "github.com/apache/dubbo-kubernetes/sail/pkg/networking"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        listener 
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
-       "strings"
 )
 
 const (
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go 
b/sail/pkg/serviceregistry/kube/controller/controller.go
index b318ebdb..1cece128 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -19,6 +19,7 @@ package controller
 
 import (
        "sort"
+       "strings"
        "sync"
        "time"
 
@@ -48,6 +49,7 @@ import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        klabels "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/intstr"
        "k8s.io/klog/v2"
 )
 
@@ -201,6 +203,41 @@ func (c *Controller) GetProxyServiceTargets(proxy 
*model.Proxy) []model.ServiceT
                proxyNamespace = proxy.Metadata.Namespace
        }
 
+       // Get pod by proxy IP address to check if service selector matches pod 
labels
+       // Also get pod to resolve targetPort from container ports
+       // If IP is empty, try to find pod by node ID (format: 
podname.namespace)
+       var podLabels labels.Instance
+       var pod *v1.Pod
+       if len(proxy.IPAddresses) > 0 {
+               pods := c.pods.getPodsByIP(proxy.IPAddresses[0])
+               if len(pods) > 0 {
+                       // Use the first pod's labels (in most cases there 
should be only one pod per IP)
+                       pod = pods[0]
+                       podLabels = labels.Instance(pod.Labels)
+               }
+       } else if proxy.ID != "" {
+               // If IP address is empty, try to find pod by node ID
+               // Node ID format for proxyless is: podname.namespace
+               parts := strings.Split(proxy.ID, ".")
+               if len(parts) >= 2 {
+                       podName := parts[0]
+                       podNamespace := parts[1]
+                       // Try to get pod by name and namespace
+                       podKey := types.NamespacedName{Name: podName, 
Namespace: podNamespace}
+                       pod = c.pods.getPodByKey(podKey)
+                       if pod != nil {
+                               // Extract IP from pod and set it to proxy
+                               if pod.Status.PodIP != "" {
+                                       proxy.IPAddresses = 
[]string{pod.Status.PodIP}
+                                       
klog.V(2).Infof("GetProxyServiceTargets: set proxy IP from pod %s/%s: %s", 
podNamespace, podName, pod.Status.PodIP)
+                               }
+                               podLabels = labels.Instance(pod.Labels)
+                       } else {
+                               klog.V(2).Infof("GetProxyServiceTargets: pod 
%s/%s not found by node ID", podNamespace, podName)
+                       }
+               }
+       }
+
        out := make([]model.ServiceTarget, 0)
        c.RLock()
        defer c.RUnlock()
@@ -236,16 +273,65 @@ func (c *Controller) GetProxyServiceTargets(proxy 
*model.Proxy) []model.ServiceT
                        continue
                }
 
+               // For services in the same namespace, check if the service 
selector matches pod labels
+               // This ensures we only return services that actually select 
this pod
+               if svc.Attributes.Namespace == proxyNamespace && podLabels != 
nil {
+                       serviceSelector := 
labels.Instance(svc.Attributes.LabelSelectors)
+                       if len(serviceSelector) > 0 {
+                               // If service has a selector, check if it 
matches pod labels
+                               if !serviceSelector.Match(podLabels) {
+                                       // Service selector doesn't match pod 
labels, skip this service
+                                       continue
+                               }
+                       }
+               }
+
+               // Get the original Kubernetes Service to resolve targetPort
+               var kubeSvc *v1.Service
+               if c.services != nil {
+                       kubeSvc = c.services.Get(svc.Attributes.Name, 
svc.Attributes.Namespace)
+               }
+
                // Create a ServiceTarget for each port
                for _, port := range svc.Ports {
                        if port == nil {
                                continue
                        }
+                       targetPort := uint32(port.Port) // Default to service 
port
+
+                       // Try to resolve actual targetPort from Kubernetes 
Service and Pod
+                       if kubeSvc != nil {
+                               // Find the matching ServicePort in Kubernetes 
Service
+                               for _, kubePort := range kubeSvc.Spec.Ports {
+                                       if kubePort.Name == port.Name && 
int32(kubePort.Port) == int32(port.Port) {
+                                               // Resolve targetPort from 
ServicePort.TargetPort
+                                               if kubePort.TargetPort.Type == 
intstr.Int {
+                                                       // TargetPort is a 
number
+                                                       targetPort = 
uint32(kubePort.TargetPort.IntVal)
+                                               } else if 
kubePort.TargetPort.Type == intstr.String && pod != nil {
+                                                       // TargetPort is a 
string (port name), resolve from Pod container ports
+                                                       for _, container := 
range pod.Spec.Containers {
+                                                               for _, 
containerPort := range container.Ports {
+                                                                       if 
containerPort.Name == kubePort.TargetPort.StrVal {
+                                                                               
targetPort = uint32(containerPort.ContainerPort)
+                                                                               
break
+                                                                       }
+                                                               }
+                                                               if targetPort 
!= uint32(port.Port) {
+                                                                       break
+                                                               }
+                                                       }
+                                               }
+                                               break
+                                       }
+                               }
+                       }
+
                        target := model.ServiceTarget{
                                Service: svc,
                                Port: model.ServiceInstancePort{
                                        ServicePort: port,
-                                       TargetPort:  uint32(port.Port),
+                                       TargetPort:  targetPort,
                                },
                        }
                        out = append(out, target)
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go 
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
index 0af5fa38..4c381a7b 100644
--- a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -1,6 +1,10 @@
 package controller
 
 import (
+       "fmt"
+       "strings"
+       "sync"
+
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
@@ -16,9 +20,9 @@ import (
        klabels "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/selection"
        "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/intstr"
+       "k8s.io/klog/v2"
        mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
-       "strings"
-       "sync"
 )
 
 var (
@@ -180,6 +184,21 @@ func (esc *endpointSliceController) 
updateEndpointCacheForSlice(hostName host.Na
                // Draining tracking is only enabled if persistent sessions is 
enabled.
                // If we start using them for other features, this can be 
adjusted.
                healthStatus := endpointHealthStatus(svc, e)
+
+               // CRITICAL: Log health status for debugging (use Warningf to 
ensure visibility)
+               if len(e.Addresses) > 0 {
+                       ready := "nil"
+                       terminating := "nil"
+                       if e.Conditions.Ready != nil {
+                               ready = fmt.Sprintf("%v", *e.Conditions.Ready)
+                       }
+                       if e.Conditions.Terminating != nil {
+                               terminating = fmt.Sprintf("%v", 
*e.Conditions.Terminating)
+                       }
+                       klog.Warningf("endpointHealthStatus: address=%s, 
Ready=%s, Terminating=%s, HealthStatus=%v, svc=%v",
+                               e.Addresses[0], ready, terminating, 
healthStatus, svc != nil)
+               }
+
                for _, a := range e.Addresses {
                        pod, expectedPod := getPod(esc.c, a, 
&metav1.ObjectMeta{Name: epSlice.Name, Namespace: epSlice.Namespace}, 
e.TargetRef, hostName)
                        if pod == nil && expectedPod {
@@ -189,17 +208,162 @@ func (esc *endpointSliceController) 
updateEndpointCacheForSlice(hostName host.Na
                        var overrideAddresses []string
                        builder := esc.c.NewEndpointBuilder(pod)
                        // EDS and ServiceEntry use name for service port - ADS 
will need to map to numbers.
+                       // CRITICAL FIX: Always use Service.Port.Name as source 
of truth for ServicePortName
+                       // - Use EndpointSlice.Port.Port (service port number) 
as endpointPort
+                       // - Always resolve portName from Service by matching 
port number (Service is source of truth)
+                       // - This ensures ep.ServicePortName matches 
svcPort.Name in BuildClusterLoadAssignment
+                       kubeSvc := esc.c.services.Get(svcNamespacedName.Name, 
svcNamespacedName.Namespace)
                        for _, port := range epSlice.Ports {
-                               var portNum int32
+                               var epSlicePortNum int32
                                if port.Port != nil {
-                                       portNum = *port.Port
+                                       epSlicePortNum = *port.Port
                                }
-                               var portName string
+                               var epSlicePortName string
                                if port.Name != nil {
-                                       portName = *port.Name
+                                       epSlicePortName = *port.Name
+                               }
+
+                               var servicePortNum int32
+                               var targetPortNum int32
+                               var portName string
+
+                               // CRITICAL FIX: EndpointSlice.Port.Port might 
be targetPort or service port
+                               // We need to find the matching ServicePort and 
resolve:
+                               // 1. servicePortNum (Service.Port) - used for 
matching in BuildClusterLoadAssignment
+                               // 2. targetPortNum (Service.TargetPort) - used 
as EndpointPort
+                               // 3. portName (Service.Port.Name) - used as 
ServicePortName for filtering
+                               if kubeSvc != nil {
+                                       matched := false
+                                       for _, kubePort := range 
kubeSvc.Spec.Ports {
+                                               // Try matching by port number 
(service port or targetPort)
+                                               portMatches := 
int32(kubePort.Port) == epSlicePortNum
+
+                                               // Also try matching by 
targetPort
+                                               var kubeTargetPort int32
+                                               if kubePort.TargetPort.Type == 
intstr.Int {
+                                                       kubeTargetPort = 
kubePort.TargetPort.IntVal
+                                               } else if 
kubePort.TargetPort.Type == intstr.String && pod != nil {
+                                                       // Resolve targetPort 
name from Pod
+                                                       for _, container := 
range pod.Spec.Containers {
+                                                               for _, 
containerPort := range container.Ports {
+                                                                       if 
containerPort.Name == kubePort.TargetPort.StrVal {
+                                                                               
kubeTargetPort = containerPort.ContainerPort
+                                                                               
break
+                                                                       }
+                                                               }
+                                                               if 
kubeTargetPort != 0 {
+                                                                       break
+                                                               }
+                                                       }
+                                               }
+                                               targetPortMatches := 
kubeTargetPort == epSlicePortNum
+
+                                               // Match by port name if 
available
+                                               nameMatches := epSlicePortName 
== "" || kubePort.Name == epSlicePortName
+
+                                               if (portMatches || 
targetPortMatches) && (epSlicePortName == "" || nameMatches) {
+                                                       // Found matching 
ServicePort
+                                                       servicePortNum = 
int32(kubePort.Port)
+                                                       portName = kubePort.Name
+
+                                                       // Resolve targetPortNum
+                                                       if 
kubePort.TargetPort.Type == intstr.Int {
+                                                               targetPortNum = 
kubePort.TargetPort.IntVal
+                                                       } else if 
kubePort.TargetPort.Type == intstr.String && pod != nil {
+                                                               // Resolve 
targetPort name from Pod container ports
+                                                               for _, 
container := range pod.Spec.Containers {
+                                                                       for _, 
containerPort := range container.Ports {
+                                                                               
if containerPort.Name == kubePort.TargetPort.StrVal {
+                                                                               
        targetPortNum = containerPort.ContainerPort
+                                                                               
        break
+                                                                               
}
+                                                                       }
+                                                                       if 
targetPortNum != 0 {
+                                                                               
break
+                                                                       }
+                                                               }
+                                                       } else {
+                                                               // If 
targetPort is string but pod not found, use service port as fallback
+                                                               targetPortNum = 
servicePortNum
+                                                       }
+
+                                                       // If targetPortNum is 
still 0, use service port
+                                                       if targetPortNum == 0 {
+                                                               targetPortNum = 
servicePortNum
+                                                       }
+
+                                                       matched = true
+                                                       
klog.V(2).InfoS("updateEndpointCacheForSlice: matched ServicePort 
(servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port 
(portNum=%d, portName='%s')",
+                                                               servicePortNum, 
targetPortNum, portName, epSlicePortNum, epSlicePortName)
+                                                       break
+                                               }
+                                       }
+
+                                       if !matched {
+                                               
klog.V(2).InfoS("updateEndpointCacheForSlice: failed to match 
EndpointSlice.Port (portNum=%d, portName='%s') with Service %s, using 
EndpointSlice values",
+                                                       epSlicePortNum, 
epSlicePortName, svcNamespacedName.Name)
+                                               // Fallback: use EndpointSlice 
values
+                                               servicePortNum = epSlicePortNum
+                                               targetPortNum = epSlicePortNum
+                                               if epSlicePortName != "" {
+                                                       portName = 
epSlicePortName
+                                               }
+                                       }
+                               } else {
+                                       // Service not found, use EndpointSlice 
values
+                                       servicePortNum = epSlicePortNum
+                                       targetPortNum = epSlicePortNum
+                                       if epSlicePortName != "" {
+                                               portName = epSlicePortName
+                                       }
+                                       
klog.V(2).InfoS("updateEndpointCacheForSlice: Service not found for %s, using 
EndpointSlice values (portNum=%d, portName='%s')",
+                                               svcNamespacedName.Name, 
epSlicePortNum, epSlicePortName)
                                }
 
-                               dubboEndpoint := builder.buildDubboEndpoint(a, 
portNum, portName, nil, healthStatus, svc.SupportsUnhealthyEndpoints())
+                               // CRITICAL: Log endpoint creation with actual 
values for debugging
+                               klog.V(2).InfoS("updateEndpointCacheForSlice: 
creating endpoint for service %s (address=%s, servicePortNum=%d, 
targetPortNum=%d, portName='%s', hostname=%s, kubeSvc=%v)",
+                                       svcNamespacedName.Name, a, 
servicePortNum, targetPortNum, portName, hostName, kubeSvc != nil)
+
+                               // CRITICAL FIX: According to Istio's 
implementation and Kubernetes EndpointSlice spec:
+                               // - EndpointSlice.Port.Port should be the 
Service Port (not targetPort)
+                               // - But IstioEndpoint.EndpointPort should be 
the targetPort (container port)
+                               // - ServicePortName should match 
Service.Port.Name for filtering in BuildClusterLoadAssignment
+                               //
+                               // We use targetPortNum as EndpointPort because 
that's what the container actually listens on.
+                               // The servicePortNum is used for matching in 
BuildClusterLoadAssignment via portName.
+                               //
+                               // CRITICAL: svc.SupportsUnhealthyEndpoints() 
returns true if Service has publishNotReadyAddresses=true
+                               // This allows endpoints with Ready=false to be 
included in EDS, which is useful for services
+                               // that need to receive traffic even before 
they are fully ready (e.g., during startup).
+                               // CRITICAL FIX: Check if svc is nil before 
calling SupportsUnhealthyEndpoints
+                               var supportsUnhealthy bool
+                               if svc != nil {
+                                       supportsUnhealthy = 
svc.SupportsUnhealthyEndpoints()
+                               } else {
+                                       // If service is nil, default to false 
(don't support unhealthy endpoints)
+                                       supportsUnhealthy = false
+                               }
+                               dubboEndpoint := builder.buildDubboEndpoint(a, 
targetPortNum, portName, nil, healthStatus, supportsUnhealthy)
+
+                               // CRITICAL: Log if endpoint is unhealthy and 
service doesn't support it
+                               if healthStatus == model.UnHealthy && 
!supportsUnhealthy {
+                                       if svc != nil {
+                                               
klog.V(2).InfoS("updateEndpointCacheForSlice: endpoint %s is unhealthy 
(HealthStatus=%v) but service %s does not support unhealthy endpoints 
(PublishNotReadyAddresses=%v). Endpoint will be filtered in EDS.",
+                                                       a, healthStatus, 
svcNamespacedName.Name, svc.Attributes.PublishNotReadyAddresses)
+                                       } else {
+                                               
klog.V(2).InfoS("updateEndpointCacheForSlice: endpoint %s is unhealthy 
(HealthStatus=%v) but service %s is nil. Endpoint will be filtered in EDS.",
+                                                       a, healthStatus, 
svcNamespacedName.Name)
+                                       }
+                               }
+
+                               // CRITICAL: Verify the endpoint was created 
with correct ServicePortName
+                               if dubboEndpoint != nil {
+                                       
klog.V(2).InfoS("updateEndpointCacheForSlice: created endpoint with 
ServicePortName='%s', EndpointPort=%d, address=%s",
+                                               dubboEndpoint.ServicePortName, 
dubboEndpoint.EndpointPort, dubboEndpoint.FirstAddressOrNil())
+                               } else {
+                                       
klog.Errorf("updateEndpointCacheForSlice: buildDubboEndpoint returned nil for 
address=%s, targetPortNum=%d, portName='%s'",
+                                               a, targetPortNum, portName)
+                               }
                                if len(overrideAddresses) > 1 {
                                        dubboEndpoint.Addresses = 
overrideAddresses
                                }
@@ -233,16 +397,25 @@ func (e *endpointSliceCache) update(hostname host.Name, 
slice string, endpoints
 }
 
 func endpointHealthStatus(svc *model.Service, e v1.Endpoint) 
model.HealthStatus {
+       // CRITICAL FIX: Correct health status logic
+       // 1. If Ready is nil or true, endpoint is healthy
+       // 2. If Ready is false, check if it's terminating
+       // 3. If terminating, mark as Terminating
+       // 4. Otherwise, mark as UnHealthy
+
+       // Check Ready condition
        if e.Conditions.Ready == nil || *e.Conditions.Ready {
                return model.Healthy
        }
 
-       // If it is shutting down, mark it as terminating. This occurs 
regardless of whether it was previously healthy or not.
-       if svc != nil &&
-               (e.Conditions.Terminating == nil || *e.Conditions.Terminating) {
+       // Ready is false, check if it's terminating
+       // CRITICAL FIX: Terminating should be checked only if it's not nil AND 
true
+       // If Terminating is nil, it means the endpoint is not terminating 
(it's just not ready)
+       if e.Conditions.Terminating != nil && *e.Conditions.Terminating {
                return model.Terminating
        }
 
+       // Ready is false and not terminating, mark as unhealthy
        return model.UnHealthy
 }
 
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index e0839467..0d566fc3 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -18,6 +18,7 @@
 package xds
 
 import (
+       "fmt"
        "strconv"
        "strings"
        "sync/atomic"
@@ -247,7 +248,10 @@ func (s *DiscoveryServer) Stream(stream DiscoveryStream) 
error {
                peerAddr = peerInfo.Addr.String()
        }
 
-       // TODO WaitForRequestLimit?
+       if err := s.WaitForRequestLimit(stream.Context()); err != nil {
+               klog.Warningf("ADS: %q exceeded rate limit: %v", peerAddr, err)
+               return status.Errorf(codes.ResourceExhausted, "request rate 
limit exceeded: %v", err)
+       }
 
        ids, err := s.authenticate(ctx)
        if err != nil {
@@ -313,8 +317,13 @@ func (s *DiscoveryServer) pushConnection(con *Connection, 
pushEv *Event) error {
 
 func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con 
*Connection) error {
        stype := v3.GetShortType(req.TypeUrl)
-       klog.V(2).Infof("ADS:%s: REQ %s resources:%d nonce:%s ", stype,
-               con.ID(), len(req.ResourceNames), req.ResponseNonce)
+       // Log requested resource names
+       resourceNamesStr := ""
+       if len(req.ResourceNames) > 0 {
+               resourceNamesStr = fmt.Sprintf(" [%s]", 
strings.Join(req.ResourceNames, ", "))
+       }
+       klog.Infof("ADS:%s: REQ %s resources:%d nonce:%s%s", stype,
+               con.ID(), len(req.ResourceNames), req.ResponseNonce, 
resourceNamesStr)
        if req.TypeUrl == v3.HealthInfoType {
                return nil
        }
@@ -330,6 +339,20 @@ func (s *DiscoveryServer) processRequest(req 
*discovery.DiscoveryRequest, con *C
                return nil
        }
 
+       // CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty 
ResourceNames) after receiving specific resources,
+       // this is likely an ACK and we should NOT push all resources again
+       // Check if this is a wildcard request after specific resources were 
sent
+       watchedResource := con.proxy.GetWatchedResource(req.TypeUrl)
+       if con.proxy.IsProxylessGrpc() && len(req.ResourceNames) == 0 && 
watchedResource != nil && len(watchedResource.ResourceNames) > 0 && 
watchedResource.NonceSent != "" {
+               // This is a wildcard ACK after specific resources were sent
+               // ShouldRespond should have returned false, but we check here 
as safety net
+               // Update the WatchedResource to reflect the ACK, but don't push
+               klog.V(2).Infof("%s: proxyless gRPC wildcard ACK after specific 
resources (prev: %d resources, nonce: %s), skipping push",
+                       stype, len(watchedResource.ResourceNames), 
watchedResource.NonceSent)
+               // ShouldRespond should have already handled this, but we 
ensure no push happens
+               return nil
+       }
+
        // Log PUSH request BEFORE calling pushXds
        if len(req.ResourceNames) > 0 {
                klog.Infof("%s: PUSH request for node:%s resources:%d", stype, 
con.ID(), len(req.ResourceNames))
@@ -337,15 +360,30 @@ func (s *DiscoveryServer) processRequest(req 
*discovery.DiscoveryRequest, con *C
                klog.Infof("%s: PUSH request for node:%s", stype, con.ID())
        }
 
+       // Don't set Forced=true for regular proxy requests to avoid 
unnecessary ServiceTargets recomputation
+       // Only set Forced for debug requests or when explicitly needed
+       // This prevents ServiceTargets from being recomputed on every request, 
which can cause
+       // inconsistent listener generation and push loops
        request := &model.PushRequest{
                Full:   true,
                Push:   con.proxy.LastPushContext,
                Reason: model.NewReasonStats(model.ProxyRequest),
                Start:  con.proxy.LastPushTime,
                Delta:  delta,
-               Forced: true,
+               Forced: false, // Only recompute ServiceTargets when 
ConfigsUpdated indicates service changes
+       }
+
+       // CRITICAL FIX: Get WatchedResource after ShouldRespond has created it
+       // ShouldRespond may have created a new WatchedResource for first-time 
requests
+       w := con.proxy.GetWatchedResource(req.TypeUrl)
+       if w == nil {
+               // This should not happen if ShouldRespond returned true, but 
handle it gracefully
+               klog.Warningf("%s: WatchedResource is nil for %s after 
ShouldRespond returned true, creating it", stype, con.ID())
+               con.proxy.NewWatchedResource(req.TypeUrl, req.ResourceNames)
+               w = con.proxy.GetWatchedResource(req.TypeUrl)
        }
-       return s.pushXds(con, con.proxy.GetWatchedResource(req.TypeUrl), 
request)
+
+       return s.pushXds(con, w, request)
 }
 
 func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 6517c644..a77cf866 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -243,8 +243,17 @@ func (s *DiscoveryServer) dropCacheForRequest(req 
*model.PushRequest) {
        // If we don't know what updated, cannot safely cache. Clear the whole 
cache
        if req.Forced {
                s.Cache.ClearAll()
+               klog.V(2).Infof("dropCacheForRequest: cleared all cache 
(Forced=true)")
        } else {
                // Otherwise, just clear the updated configs
+               // CRITICAL: Log cache clear for debugging
+               if len(req.ConfigsUpdated) > 0 {
+                       configs := make([]string, 0, len(req.ConfigsUpdated))
+                       for ckey := range req.ConfigsUpdated {
+                               configs = append(configs, ckey.String())
+                       }
+                       klog.V(3).Infof("dropCacheForRequest: clearing cache 
for configs: %v", configs)
+               }
                s.Cache.Clear(req.ConfigsUpdated)
        }
 }
diff --git a/sail/pkg/xds/eds.go b/sail/pkg/xds/eds.go
index cbed1d15..f33f1d4c 100644
--- a/sail/pkg/xds/eds.go
+++ b/sail/pkg/xds/eds.go
@@ -8,7 +8,9 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
        "github.com/apache/dubbo-kubernetes/sail/pkg/xds/endpoints"
+       endpoint 
"github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+       "k8s.io/klog/v2"
 )
 
 var _ model.XdsDeltaResourceGenerator = &EdsGenerator{}
@@ -105,17 +107,45 @@ func (eds *EdsGenerator) buildEndpoints(proxy 
*model.Proxy,
        // see 
https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#grouping-resources-into-responses
        if !req.Full || canSendPartialFullPushes(req) {
                edsUpdatedServices = 
model.ConfigNamesOfKind(req.ConfigsUpdated, kind.ServiceEntry)
+               if len(edsUpdatedServices) > 0 {
+                       klog.V(3).Infof("buildEndpoints: edsUpdatedServices 
count=%d (Full=%v)", len(edsUpdatedServices), req.Full)
+               }
        }
        var resources model.Resources
        empty := 0
        cached := 0
        regenerated := 0
        for clusterName := range w.ResourceNames {
+               hostname := model.ParseSubsetKeyHostname(clusterName)
+               // CRITICAL FIX: Always check if this service was updated, even 
if edsUpdatedServices is nil
+               // For incremental pushes, we need to check ConfigsUpdated 
directly to ensure cache is cleared
+               serviceWasUpdated := false
+               if req.ConfigsUpdated != nil {
+                       // CRITICAL: Log all ConfigsUpdated entries for 
debugging hostname matching
+                       configKeys := make([]string, 0, len(req.ConfigsUpdated))
+                       for ckey := range req.ConfigsUpdated {
+                               configKeys = append(configKeys, 
fmt.Sprintf("%s/%s/%s", ckey.Kind, ckey.Namespace, ckey.Name))
+                               if ckey.Kind == kind.ServiceEntry {
+                                       // CRITICAL: Match ConfigsUpdated.Name 
with hostname
+                                       // Both should be FQDN (e.g., 
"consumer.grpc-proxyless.svc.cluster.local")
+                                       if ckey.Name == hostname {
+                                               serviceWasUpdated = true
+                                               
klog.V(2).Infof("buildEndpoints: service %s was updated, forcing regeneration", 
hostname)
+                                               break
+                                       }
+                               }
+                       }
+               }
+
                if edsUpdatedServices != nil {
-                       if _, ok := 
edsUpdatedServices[model.ParseSubsetKeyHostname(clusterName)]; !ok {
+                       if _, ok := edsUpdatedServices[hostname]; !ok {
                                // Cluster was not updated, skip recomputing. 
This happens when we get an incremental update for a
                                // specific Hostname. On connect or for full 
push edsUpdatedServices will be empty.
-                               continue
+                               if !serviceWasUpdated {
+                                       continue
+                               }
+                       } else {
+                               serviceWasUpdated = true
                        }
                }
                builder := endpoints.NewEndpointBuilder(clusterName, proxy, 
req.Push)
@@ -123,8 +153,25 @@ func (eds *EdsGenerator) buildEndpoints(proxy *model.Proxy,
                        continue
                }
 
-               // Try to get from cache
-               if eds.Cache != nil {
+               // CRITICAL FIX: For incremental pushes (Full=false), we must 
always regenerate EDS
+               // if ConfigsUpdated contains this service, because endpoint 
health status may have changed.
+               // The cache may contain stale empty endpoints from when the 
endpoint was unhealthy.
+               // For full pushes, we can use cache if the service was not 
updated.
+               // CRITICAL: If this is an incremental push and ConfigsUpdated 
contains any ServiceEntry,
+               // we must check if it matches this hostname. If it does, force 
regeneration.
+               shouldRegenerate := serviceWasUpdated
+               if !shouldRegenerate && !req.Full && req.ConfigsUpdated != nil {
+                       // For incremental pushes, check if any ServiceEntry in 
ConfigsUpdated matches this hostname
+                       for ckey := range req.ConfigsUpdated {
+                               if ckey.Kind == kind.ServiceEntry && ckey.Name 
== hostname {
+                                       shouldRegenerate = true
+                                       break
+                               }
+                       }
+               }
+
+               // Try to get from cache only if we don't need to regenerate
+               if !shouldRegenerate && eds.Cache != nil {
                        cachedEndpoint := eds.Cache.Get(builder)
                        if cachedEndpoint != nil {
                                resources = append(resources, cachedEndpoint)
@@ -135,8 +182,17 @@ func (eds *EdsGenerator) buildEndpoints(proxy *model.Proxy,
 
                // generate eds from beginning
                l := builder.BuildClusterLoadAssignment(eds.EndpointIndex)
+               // CRITICAL FIX: Even if endpoints are empty (l == nil), we 
should still create an empty ClusterLoadAssignment
+               // to ensure the client receives the EDS response. This is 
necessary for proxyless gRPC clients
+               // to know the endpoint state, even if it's empty initially.
+               // Note: BuildClusterLoadAssignment returns nil when no 
endpoints are found,
+               // but we still need to send an empty ClusterLoadAssignment to 
the client.
                if l == nil {
-                       continue
+                       // Build an empty ClusterLoadAssignment for this cluster
+                       // Use the same logic as 
endpoint_builder.go:buildEmptyClusterLoadAssignment
+                       l = &endpoint.ClusterLoadAssignment{
+                               ClusterName: clusterName,
+                       }
                }
                regenerated++
 
diff --git a/sail/pkg/xds/endpoints/endpoint_builder.go 
b/sail/pkg/xds/endpoints/endpoint_builder.go
index 4635dcff..aa5a870d 100644
--- a/sail/pkg/xds/endpoints/endpoint_builder.go
+++ b/sail/pkg/xds/endpoints/endpoint_builder.go
@@ -20,11 +20,14 @@ package endpoints
 import (
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
+       "github.com/cespare/xxhash/v2"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        endpoint 
"github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
        "google.golang.org/protobuf/types/known/wrapperspb"
+       "k8s.io/klog/v2"
 )
 
 var _ model.XdsCacheEntry = &EndpointBuilder{}
@@ -80,9 +83,22 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
        // Get endpoints from the endpoint index
        shards, ok := endpointIndex.ShardsForService(string(b.hostname), 
b.service.Attributes.Namespace)
        if !ok {
+               klog.Warningf("BuildClusterLoadAssignment: no shards found for 
service %s in namespace %s (cluster=%s, port=%d, svcPort.Name='%s', 
svcPort.Port=%d)",
+                       b.hostname, b.service.Attributes.Namespace, 
b.clusterName, b.port, svcPort.Name, svcPort.Port)
                return buildEmptyClusterLoadAssignment(b.clusterName)
        }
 
+       // CRITICAL: Log shards info before processing
+       shards.RLock()
+       shardCount := len(shards.Shards)
+       totalEndpointsInShards := 0
+       for _, eps := range shards.Shards {
+               totalEndpointsInShards += len(eps)
+       }
+       shards.RUnlock()
+       klog.V(3).Infof("BuildClusterLoadAssignment: found shards for service 
%s (cluster=%s, port=%d, shardCount=%d, totalEndpointsInShards=%d)",
+               b.hostname, b.clusterName, b.port, shardCount, 
totalEndpointsInShards)
+
        // Build port map for filtering
        portMap := map[string]int{}
        for _, port := range b.service.Ports {
@@ -99,10 +115,17 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
 
        // Filter and convert endpoints
        var lbEndpoints []*endpoint.LbEndpoint
+       var filteredCount int
+       var totalEndpoints int
+
        for _, eps := range shards.Shards {
                for _, ep := range eps {
+                       totalEndpoints++
                        // Filter by port name
                        if ep.ServicePortName != svcPort.Name {
+                               filteredCount++
+                               klog.V(3).Infof("BuildClusterLoadAssignment: 
filtering out endpoint %s (port name mismatch: '%s' != '%s')",
+                                       ep.FirstAddressOrNil(), 
ep.ServicePortName, svcPort.Name)
                                continue
                        }
 
@@ -112,19 +135,33 @@ func (b *EndpointBuilder) 
BuildClusterLoadAssignment(endpointIndex *model.Endpoi
                        }
 
                        // Filter out unhealthy endpoints unless service 
supports them
+                       // CRITICAL: Even if endpoint is unhealthy, we should 
include it if:
+                       // 1. Service has publishNotReadyAddresses=true 
(SupportsUnhealthyEndpoints returns true)
+                       // 2. Or if the endpoint is actually healthy 
(HealthStatus=Healthy)
+                       //
+                       // For gRPC proxyless, we may want to include endpoints 
even if they're not ready initially,
+                       // as the client will handle connection failures. 
However, this is controlled by the service
+                       // configuration (publishNotReadyAddresses).
                        if !b.service.SupportsUnhealthyEndpoints() && 
ep.HealthStatus == model.UnHealthy {
+                               klog.V(3).Infof("BuildClusterLoadAssignment: 
filtering out unhealthy endpoint %s", ep.FirstAddressOrNil())
+                               filteredCount++
                                continue
                        }
 
                        // Build LbEndpoint
                        lbEp := b.buildLbEndpoint(ep)
-                       if lbEp != nil {
-                               lbEndpoints = append(lbEndpoints, lbEp)
+                       if lbEp == nil {
+                               klog.V(3).Infof("BuildClusterLoadAssignment: 
buildLbEndpoint returned nil for endpoint %s", ep.FirstAddressOrNil())
+                               filteredCount++
+                               continue
                        }
+                       lbEndpoints = append(lbEndpoints, lbEp)
                }
        }
 
        if len(lbEndpoints) == 0 {
+               klog.V(2).Infof("BuildClusterLoadAssignment: no endpoints found 
for cluster %s (hostname=%s, port=%d, totalEndpoints=%d, filteredCount=%d)",
+                       b.clusterName, b.hostname, b.port, totalEndpoints, 
filteredCount)
                return buildEmptyClusterLoadAssignment(b.clusterName)
        }
 
@@ -216,18 +253,28 @@ func (b *EndpointBuilder) Cacheable() bool {
 
 // Key implements model.XdsCacheEntry
 func (b *EndpointBuilder) Key() any {
-       // Simple key based on cluster name for now
-       // TODO: implement proper hashing if needed for cache optimization
-       return b.clusterName
+       // CRITICAL FIX: EDS cache expects uint64 key, not string
+       // Hash the cluster name to uint64 to match the cache type
+       return xxhash.Sum64String(b.clusterName)
 }
 
 // Type implements model.XdsCacheEntry
 func (b *EndpointBuilder) Type() string {
-       return "EDS"
+       return "eds" // Must match model.EDSType constant ("eds", not "EDS")
 }
 
 // DependentConfigs implements model.XdsCacheEntry
 func (b *EndpointBuilder) DependentConfigs() []model.ConfigHash {
-       // Return empty slice for now - can be enhanced to return ServiceEntry 
config hash if needed
-       return nil
+       // CRITICAL FIX: Return ServiceEntry ConfigHash so that EDS cache can 
be properly cleared
+       // when endpoints are updated. Without this, cache.Clear() cannot find 
and remove stale EDS entries.
+       if b.service == nil {
+               return nil
+       }
+       // Create ConfigKey for ServiceEntry and return its hash
+       configKey := model.ConfigKey{
+               Kind:      kind.ServiceEntry,
+               Name:      string(b.hostname),
+               Namespace: b.service.Attributes.Namespace,
+       }
+       return []model.ConfigHash{configKey.HashCode()}
 }
diff --git a/sail/pkg/xds/lds.go b/sail/pkg/xds/lds.go
index e404dbb6..4d434bb2 100644
--- a/sail/pkg/xds/lds.go
+++ b/sail/pkg/xds/lds.go
@@ -24,13 +24,21 @@ func ldsNeedsPush(proxy *model.Proxy, req 
*model.PushRequest) bool {
        return false
 }
 
-func (l LdsGenerator) Generate(proxy *model.Proxy, _ *model.WatchedResource, 
req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
+func (l LdsGenerator) Generate(proxy *model.Proxy, w *model.WatchedResource, 
req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
        if !ldsNeedsPush(proxy, req) {
                return nil, model.DefaultXdsLogDetails, nil
        }
+       // For standard ConfigGenerator (Envoy), BuildListeners doesn't take 
names parameter
+       // The names filtering is handled by the resource filtering logic in 
pushXds
        listeners := l.ConfigGenerator.BuildListeners(proxy, req.Push)
        resources := model.Resources{}
        for _, c := range listeners {
+               // Filter resources based on WatchedResource.ResourceNames if 
specified
+               if w != nil && w.ResourceNames != nil && len(w.ResourceNames) > 
0 {
+                       if !w.ResourceNames.Contains(c.Name) {
+                               continue
+                       }
+               }
                resources = append(resources, &discovery.Resource{
                        Name:     c.Name,
                        Resource: protoconv.MessageToAny(c),
diff --git a/sail/pkg/xds/xdsgen.go b/sail/pkg/xds/xdsgen.go
index e78afe5d..611d8729 100644
--- a/sail/pkg/xds/xdsgen.go
+++ b/sail/pkg/xds/xdsgen.go
@@ -6,13 +6,16 @@ import (
        "strconv"
        "strings"
 
+       "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/env"
        "github.com/apache/dubbo-kubernetes/pkg/lazy"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        dubboversion "github.com/apache/dubbo-kubernetes/pkg/version"
        "github.com/apache/dubbo-kubernetes/pkg/xds"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
        v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+       cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
        "k8s.io/klog/v2"
@@ -64,6 +67,91 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                return nil
        }
 
+       // CRITICAL FIX: For proxyless gRPC, handle wildcard (empty 
ResourceNames) requests correctly
+       // When client sends empty ResourceNames after receiving specific 
resources, it's likely an ACK
+       // We should NOT generate all resources, but instead return the last 
sent resources
+       // However, for initial wildcard requests, we need to extract resource 
names from parent resources
+       var requestedResourceNames sets.String
+       var useLastSentResources bool
+       if con.proxy.IsProxylessGrpc() {
+               // Check if this is a wildcard request (empty ResourceNames) 
but we have previously sent resources
+               if len(w.ResourceNames) == 0 && w.NonceSent != "" {
+                       // This is likely an ACK after receiving specific 
resources
+                       // Use the last sent resources instead of generating all
+                       useLastSentResources = true
+                       // Get the last sent resource names from WatchedResource
+                       // We'll populate this from the last sent resources 
after generation
+                       klog.V(2).Infof("pushXds: proxyless gRPC wildcard 
request with NonceSent=%s, will use last sent resources", w.NonceSent)
+               } else if len(w.ResourceNames) == 0 && w.NonceSent == "" {
+                       // CRITICAL FIX: Initial wildcard request - need to 
extract resource names from parent resources
+                       // For CDS: extract cluster names from LDS
+                       // For EDS: extract cluster names from CDS
+                       if w.TypeUrl == v3.ClusterType {
+                               // Extract cluster names from LDS response
+                               ldsWatched := 
con.proxy.GetWatchedResource(v3.ListenerType)
+                               if ldsWatched != nil && ldsWatched.NonceSent != 
"" {
+                                       // LDS has been sent, extract cluster 
names from it
+                                       // We need to regenerate LDS to extract 
cluster names, or store them
+                                       // For now, let's extract from the 
proxy's ServiceTargets or from LDS generation
+                                       klog.V(2).Infof("pushXds: CDS wildcard 
request, extracting cluster names from LDS")
+                                       // Create a temporary request to 
generate LDS and extract cluster names
+                                       ldsReq := &model.PushRequest{
+                                               Full:   true,
+                                               Push:   req.Push,
+                                               Reason: 
model.NewReasonStats(model.DependentResource),
+                                               Start:  con.proxy.LastPushTime,
+                                               Forced: false,
+                                       }
+                                       ldsGen := 
s.findGenerator(v3.ListenerType, con)
+                                       if ldsGen != nil {
+                                               ldsRes, _, _ := 
ldsGen.Generate(con.proxy, ldsWatched, ldsReq)
+                                               if len(ldsRes) > 0 {
+                                                       clusterNames := 
extractClusterNamesFromLDS(ldsRes)
+                                                       if len(clusterNames) > 
0 {
+                                                               w.ResourceNames 
= sets.New(clusterNames...)
+                                                               
requestedResourceNames = sets.New[string]()
+                                                               
requestedResourceNames.InsertAll(clusterNames...)
+                                                               
klog.V(2).Infof("pushXds: extracted %d cluster names from LDS: %v", 
len(clusterNames), clusterNames)
+                                                       }
+                                               }
+                                       }
+                               }
+                       } else if w.TypeUrl == v3.EndpointType {
+                               // Extract cluster names from CDS response
+                               cdsWatched := 
con.proxy.GetWatchedResource(v3.ClusterType)
+                               if cdsWatched != nil && cdsWatched.NonceSent != 
"" {
+                                       // CDS has been sent, extract EDS 
cluster names from it
+                                       klog.V(2).Infof("pushXds: EDS wildcard 
request, extracting cluster names from CDS")
+                                       // Create a temporary request to 
generate CDS and extract EDS cluster names
+                                       cdsReq := &model.PushRequest{
+                                               Full:   true,
+                                               Push:   req.Push,
+                                               Reason: 
model.NewReasonStats(model.DependentResource),
+                                               Start:  con.proxy.LastPushTime,
+                                               Forced: false,
+                                       }
+                                       cdsGen := 
s.findGenerator(v3.ClusterType, con)
+                                       if cdsGen != nil {
+                                               cdsRes, _, _ := 
cdsGen.Generate(con.proxy, cdsWatched, cdsReq)
+                                               if len(cdsRes) > 0 {
+                                                       edsClusterNames := 
extractEDSClusterNamesFromCDS(cdsRes)
+                                                       if len(edsClusterNames) 
> 0 {
+                                                               w.ResourceNames 
= sets.New(edsClusterNames...)
+                                                               
requestedResourceNames = sets.New[string]()
+                                                               
requestedResourceNames.InsertAll(edsClusterNames...)
+                                                               
klog.V(2).Infof("pushXds: extracted %d EDS cluster names from CDS: %v", 
len(edsClusterNames), edsClusterNames)
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               } else if len(w.ResourceNames) > 0 {
+                       // Specific resource request
+                       requestedResourceNames = sets.New[string]()
+                       
requestedResourceNames.InsertAll(w.ResourceNames.UnsortedList()...)
+               }
+       }
+
        // If delta is set, client is requesting new resources or removing old 
ones. We should just generate the
        // new resources it needs, rather than the entire set of known 
resources.
        // Note: we do not need to account for unsubscribed resources as these 
are handled by parent removal;
@@ -78,7 +166,19 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                }
        }
 
-       res, logdata, err := gen.Generate(con.proxy, w, req)
+       // For proxyless gRPC wildcard requests with previous NonceSent, use 
last sent resources
+       var res model.Resources
+       var logdata model.XdsLogDetails
+       var err error
+       if useLastSentResources {
+               // Don't generate new resources, return empty and we'll handle 
it below
+               res = nil
+               logdata = model.DefaultXdsLogDetails
+               err = nil
+       } else {
+               res, logdata, err = gen.Generate(con.proxy, w, req)
+       }
+
        info := ""
        if len(logdata.AdditionalInfo) > 0 {
                info = " " + logdata.AdditionalInfo
@@ -86,15 +186,69 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
        if len(logFiltered) > 0 {
                info += logFiltered
        }
-       if err != nil || res == nil {
+       if err != nil {
                return err
        }
 
+       // CRITICAL FIX: For proxyless gRPC wildcard requests with previous 
NonceSent, return last sent resources
+       if useLastSentResources && res == nil {
+               // This is a wildcard ACK - client is acknowledging previous 
push
+               // We should NOT push again, as the client already has the 
resources
+               // The ShouldRespond logic should have prevented this, but we 
handle it here as safety
+               klog.V(2).Infof("pushXds: proxyless gRPC wildcard ACK with 
NonceSent=%s, skipping push (client already has resources from previous push)", 
w.NonceSent)
+               return nil
+       }
+
+       if res == nil {
+               return nil
+       }
+
+       // CRITICAL FIX: For proxyless gRPC, filter resources to only include 
requested ones
+       // This prevents the push loop where client requests 1 resource but 
receives 13/14
+       var filteredRes model.Resources
+       if con.proxy.IsProxylessGrpc() {
+               if len(requestedResourceNames) > 0 {
+                       // Filter to only requested resources
+                       filteredRes = make(model.Resources, 0, len(res))
+                       for _, r := range res {
+                               if requestedResourceNames.Contains(r.Name) {
+                                       filteredRes = append(filteredRes, r)
+                               } else {
+                                       klog.V(2).Infof("pushXds: filtering out 
unrequested resource %s for proxyless gRPC (requested: %v)", r.Name, 
requestedResourceNames.UnsortedList())
+                               }
+                       }
+                       if len(filteredRes) != len(res) {
+                               info += " filtered:" + 
strconv.Itoa(len(res)-len(filteredRes))
+                               res = filteredRes
+                       }
+                       // CRITICAL: If filtering resulted in 0 resources but 
client requested specific resources,
+                       // this means the requested resources don't exist. 
Don't send empty response to avoid loop.
+                       // Instead, log and return nil to prevent push.
+                       if len(res) == 0 && len(requestedResourceNames) > 0 {
+                               klog.Warningf("pushXds: proxyless gRPC 
requested %d resources but none matched after filtering (requested: %v, 
generated before filter: %d). Skipping push to avoid loop.",
+                                       len(requestedResourceNames), 
requestedResourceNames.UnsortedList(), len(filteredRes)+len(res))
+                               return nil
+                       }
+               } else if len(w.ResourceNames) == 0 {
+                       // Wildcard request without previous NonceSent - this 
is initial request
+                       // Allow generating all resources for initial connection
+                       klog.V(2).Infof("pushXds: proxyless gRPC initial 
wildcard request, generating all resources")
+               }
+       }
+
+       // CRITICAL: Never send empty response for proxyless gRPC - this causes 
push loops
+       // If we have no resources to send, return nil instead of sending empty 
response
+       if len(res) == 0 {
+               klog.V(2).Infof("pushXds: no resources to send for %s (proxy: 
%s), skipping push", w.TypeUrl, con.proxy.ID)
+               return nil
+       }
+
+       nonceValue := nonce(req.Push.PushVersion)
        resp := &discovery.DiscoveryResponse{
                ControlPlane: ControlPlane(w.TypeUrl),
                TypeUrl:      w.TypeUrl,
                VersionInfo:  req.Push.PushVersion,
-               Nonce:        nonce(req.Push.PushVersion),
+               Nonce:        nonceValue,
                Resources:    xds.ResourcesToAny(res),
        }
 
@@ -107,17 +261,216 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                return err
        }
 
+       // CRITICAL FIX: Update NonceSent after successfully sending the 
response
+       // This is essential for tracking which nonce was sent and preventing 
push loops
+       con.proxy.UpdateWatchedResource(w.TypeUrl, func(wr 
*model.WatchedResource) *model.WatchedResource {
+               if wr == nil {
+                       return nil
+               }
+               wr.NonceSent = nonceValue
+               // Also update ResourceNames to match what we actually sent 
(for proxyless gRPC)
+               if con.proxy.IsProxylessGrpc() && res != nil {
+                       sentNames := sets.New[string]()
+                       for _, r := range res {
+                               sentNames.Insert(r.Name)
+                       }
+                       // Only update if we sent different resources than 
requested
+                       if requestedResourceNames != nil {
+                               // Compare sets by checking if they have the 
same size and all elements match
+                               if sentNames.Len() != 
requestedResourceNames.Len() {
+                                       wr.ResourceNames = sentNames
+                               } else {
+                                       // Check if all sent names are in 
requested names
+                                       allMatch := true
+                                       for name := range sentNames {
+                                               if 
!requestedResourceNames.Contains(name) {
+                                                       allMatch = false
+                                                       break
+                                               }
+                                       }
+                                       if !allMatch {
+                                               wr.ResourceNames = sentNames
+                                       }
+                               }
+                       }
+               }
+               return wr
+       })
+
        switch {
        case !req.Full:
        default:
                // Log format matches Istio: "LDS: PUSH for node:xxx 
resources:1 size:342B"
-               klog.Infof("%s: %s for node:%s resources:%d size:%s", 
v3.GetShortType(w.TypeUrl), ptype, con.proxy.ID, len(res),
-                       util.ByteCount(ResourceSize(res)))
+               resourceNamesStr := ""
+               if len(res) > 0 && len(res) <= 5 {
+                       // Log resource names if there are few resources (for 
debugging)
+                       names := make([]string, 0, len(res))
+                       for _, r := range res {
+                               names = append(names, r.Name)
+                       }
+                       resourceNamesStr = fmt.Sprintf(" [%s]", 
strings.Join(names, ", "))
+               }
+               klog.Infof("%s: %s for node:%s resources:%d size:%s%s%s", 
v3.GetShortType(w.TypeUrl), ptype, con.proxy.ID, len(res),
+                       util.ByteCount(ResourceSize(res)), info, 
resourceNamesStr)
+       }
+
+       // CRITICAL FIX: For proxyless gRPC, after pushing LDS with outbound 
listeners,
+       // automatically trigger CDS push for the referenced clusters ONLY if 
this is a direct request push
+       // (not a push from pushConnection which would cause loops)
+       // Only auto-push if CDS is not already being watched by the client 
(client will request it naturally)
+       if w.TypeUrl == v3.ListenerType && con.proxy.IsProxylessGrpc() && 
len(res) > 0 {
+               // Only auto-push CDS if this is a direct request (not a full 
push from pushConnection)
+               // Check if this push was triggered by a direct client request 
using IsRequest()
+               isDirectRequest := req.IsRequest()
+               if isDirectRequest {
+                       clusterNames := extractClusterNamesFromLDS(res)
+                       if len(clusterNames) > 0 {
+                               cdsWatched := 
con.proxy.GetWatchedResource(v3.ClusterType)
+                               // Only auto-push CDS if client hasn't already 
requested it
+                               // If client has already requested CDS 
(WatchedResource exists with ResourceNames),
+                               // the client's request will handle it, so we 
don't need to auto-push
+                               if cdsWatched == nil || 
cdsWatched.ResourceNames == nil || len(cdsWatched.ResourceNames) == 0 {
+                                       // Client hasn't requested CDS yet, 
auto-push to ensure client gets the cluster config
+                                       
con.proxy.NewWatchedResource(v3.ClusterType, clusterNames)
+                                       klog.V(2).Infof("pushXds: LDS push 
completed, auto-pushing CDS for clusters: %v", clusterNames)
+                                       // Trigger CDS push directly without 
going through pushConnection to avoid loops
+                                       cdsReq := &model.PushRequest{
+                                               Full:   true,
+                                               Push:   req.Push,
+                                               Reason: 
model.NewReasonStats(model.ProxyRequest),
+                                               Start:  con.proxy.LastPushTime,
+                                               Forced: false,
+                                       }
+                                       if err := s.pushXds(con, 
con.proxy.GetWatchedResource(v3.ClusterType), cdsReq); err != nil {
+                                               klog.Warningf("pushXds: failed 
to push CDS after LDS: %v", err)
+                                       }
+                               } else {
+                                       // Client has already requested CDS, 
let the client's request handle it
+                                       klog.V(2).Infof("pushXds: LDS push 
completed, client already requested CDS, skipping auto-push")
+                               }
+                       }
+               }
+       }
+
+       // CRITICAL FIX: For proxyless gRPC, after pushing CDS with EDS 
clusters,
+       // automatically trigger EDS push for the referenced endpoints
+       // This is necessary for load balancing - client needs EDS to discover 
all available endpoints
+       if w.TypeUrl == v3.ClusterType && con.proxy.IsProxylessGrpc() && 
len(res) > 0 {
+               // Extract EDS cluster names from CDS resources
+               edsClusterNames := extractEDSClusterNamesFromCDS(res)
+               if len(edsClusterNames) > 0 {
+                       edsWatched := 
con.proxy.GetWatchedResource(v3.EndpointType)
+                       shouldPushEDS := false
+                       if edsWatched == nil {
+                               // EDS not watched yet, create watched resource
+                               con.proxy.NewWatchedResource(v3.EndpointType, 
edsClusterNames)
+                               shouldPushEDS = true
+                               klog.V(2).Infof("pushXds: CDS push completed, 
auto-pushing EDS for clusters: %v", edsClusterNames)
+                       } else {
+                               // Check if any cluster names are missing from 
the watched set
+                               existingNames := edsWatched.ResourceNames
+                               if existingNames == nil {
+                                       existingNames = sets.New[string]()
+                               }
+                               hasNewClusters := false
+                               for _, cn := range edsClusterNames {
+                                       if !existingNames.Contains(cn) {
+                                               hasNewClusters = true
+                                               break
+                                       }
+                               }
+                               if hasNewClusters {
+                                       // Update EDS watched resource to 
include the new cluster names
+                                       
con.proxy.UpdateWatchedResource(v3.EndpointType, func(wr 
*model.WatchedResource) *model.WatchedResource {
+                                               if wr == nil {
+                                                       wr = 
&model.WatchedResource{TypeUrl: v3.EndpointType, ResourceNames: 
sets.New[string]()}
+                                               }
+                                               existingNames := 
wr.ResourceNames
+                                               if existingNames == nil {
+                                                       existingNames = 
sets.New[string]()
+                                               }
+                                               for _, cn := range 
edsClusterNames {
+                                                       existingNames.Insert(cn)
+                                               }
+                                               wr.ResourceNames = existingNames
+                                               return wr
+                                       })
+                                       shouldPushEDS = true
+                                       klog.V(2).Infof("pushXds: CDS push 
completed, updating EDS watched resource with new clusters: %v", 
edsClusterNames)
+                               } else {
+                                       klog.V(2).Infof("pushXds: CDS push 
completed, EDS clusters already watched: %v", edsClusterNames)
+                               }
+                       }
+
+                       // Only push EDS if we have new clusters to push
+                       if shouldPushEDS {
+                               // Trigger EDS push directly without going 
through pushConnection to avoid loops
+                               edsReq := &model.PushRequest{
+                                       Full:   true,
+                                       Push:   req.Push,
+                                       Reason: 
model.NewReasonStats(model.DependentResource),
+                                       Start:  con.proxy.LastPushTime,
+                                       Forced: true, // Force EDS push to 
ensure endpoints are available for load balancing
+                               }
+                               if err := s.pushXds(con, 
con.proxy.GetWatchedResource(v3.EndpointType), edsReq); err != nil {
+                                       klog.Warningf("pushXds: failed to push 
EDS after CDS: %v", err)
+                               }
+                       }
+               }
        }
 
        return nil
 }
 
+// extractEDSClusterNamesFromCDS extracts EDS cluster names from CDS cluster 
resources
+// Only clusters with ClusterDiscoveryType=EDS are returned
+func extractEDSClusterNamesFromCDS(clusters model.Resources) []string {
+       clusterNames := sets.New[string]()
+       for _, r := range clusters {
+               // Unmarshal the cluster resource to check its type
+               cl := &cluster.Cluster{}
+               if err := r.Resource.UnmarshalTo(cl); err != nil {
+                       klog.V(2).Infof("extractEDSClusterNamesFromCDS: failed 
to unmarshal cluster %s: %v", r.Name, err)
+                       continue
+               }
+               // Check if this is an EDS cluster
+               if cl.ClusterDiscoveryType != nil {
+                       if edsType, ok := 
cl.ClusterDiscoveryType.(*cluster.Cluster_Type); ok && edsType.Type == 
cluster.Cluster_EDS {
+                               // This is an EDS cluster, add to the list
+                               clusterNames.Insert(cl.Name)
+                       }
+               }
+       }
+       return clusterNames.UnsortedList()
+}
+
+// extractClusterNamesFromLDS extracts cluster names referenced in LDS 
listener resources
+// For outbound listeners with name format "hostname:port", the cluster name 
is "outbound|port||hostname"
+func extractClusterNamesFromLDS(listeners model.Resources) []string {
+       clusterNames := sets.New[string]()
+       for _, r := range listeners {
+               // Parse listener name to extract cluster name
+               // Outbound listener format: "hostname:port" -> cluster: 
"outbound|port||hostname"
+               // Inbound listener format: 
"xds.dubbo.io/grpc/lds/inbound/[::]:port" -> no cluster
+               listenerName := r.Name
+               if strings.Contains(listenerName, ":") && 
!strings.HasPrefix(listenerName, "xds.dubbo.io/grpc/lds/inbound/") {
+                       // This is an outbound listener
+                       parts := strings.Split(listenerName, ":")
+                       if len(parts) == 2 {
+                               hostname := parts[0]
+                               portStr := parts[1]
+                               port, err := strconv.Atoi(portStr)
+                               if err == nil {
+                                       // Build cluster name: 
outbound|port||hostname
+                                       clusterName := 
model.BuildSubsetKey(model.TrafficDirectionOutbound, "", host.Name(hostname), 
port)
+                                       clusterNames.Insert(clusterName)
+                               }
+                       }
+               }
+       }
+       return clusterNames.UnsortedList()
+}
+
 func ResourceSize(r model.Resources) int {
        size := 0
        for _, r := range r {
diff --git a/samples/example/README.md b/samples/example/README.md
deleted file mode 100644
index 924c415c..00000000
--- a/samples/example/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Example
\ No newline at end of file
diff --git a/samples/grpc-proxyless/grpc-proxyless.yaml 
b/samples/grpc-proxyless/grpc-proxyless.yaml
new file mode 100644
index 00000000..f13deb9c
--- /dev/null
+++ b/samples/grpc-proxyless/grpc-proxyless.yaml
@@ -0,0 +1,107 @@
+apiVersion: v1
+kind: Service
+metadata:
+  labels:
+    app: consumer
+  name: consumer
+  namespace: grpc-proxyless
+spec:
+  selector:
+    app: consumer
+  type: ClusterIP
+  ports:
+    - name: grpc
+      port: 7070
+      targetPort: 17070
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: consumer
+  namespace: grpc-proxyless
+spec:
+  replicas: 2
+  selector:
+    matchLabels:
+      app: consumer
+  template:
+    metadata:
+      annotations:
+        proxyless.dubbo.io/inject: "true"
+        inject.dubbo.io/templates: grpc-agent
+        proxy.dubbo.io/config: '{"holdApplicationUntilProxyStarts": true}'
+      labels:
+        app: consumer
+    spec:
+      containers:
+        - name: app
+          image: mfordjody/grpc-consumer:latest
+          imagePullPolicy: Always
+          args:
+            - --port=17070
+          ports:
+            - containerPort: 17070
+              protocol: TCP
+              name: grpc
+          env:
+            - name: INSTANCE_IP
+              valueFrom:
+                fieldRef:
+                  apiVersion: v1
+                  fieldPath: status.podIP
+          readinessProbe:
+            tcpSocket:
+              port: 17070
+            initialDelaySeconds: 5
+            periodSeconds: 5
+            timeoutSeconds: 2
+            successThreshold: 1
+            failureThreshold: 3
+          livenessProbe:
+            tcpSocket:
+              port: 17070
+            initialDelaySeconds: 10
+            periodSeconds: 10
+            timeoutSeconds: 2
+            successThreshold: 1
+            failureThreshold: 3
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: producer
+  namespace: grpc-proxyless
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: producer
+  template:
+    metadata:
+      annotations:
+        proxyless.dubbo.io/inject: "true"
+        inject.dubbo.io/templates: grpc-agent
+        proxy.dubbo.io/config: '{"holdApplicationUntilProxyStarts": true}'
+      labels:
+        app: producer
+    spec:
+      containers:
+        - name: app
+          # Replace with your own image containing the producer binary
+          image: mfordjody/grpc-producer:latest
+          imagePullPolicy: Always
+          args:
+            - --port=17171
+          # Optional: uncomment to test direct connection
+          # - --target=xds:///consumer.grpc-proxyless.svc.cluster.local:7070
+          # - --count=10
+          ports:
+            - containerPort: 17171
+              protocol: TCP
+              name: grpc-test
+          env:
+            - name: INSTANCE_IP
+              valueFrom:
+                fieldRef:
+                  apiVersion: v1
+                  fieldPath: status.podIP
diff --git a/test/grpc-proxyless/.dockerignore 
b/test/grpc-proxyless/.dockerignore
new file mode 100644
index 00000000..ffa25274
--- /dev/null
+++ b/test/grpc-proxyless/.dockerignore
@@ -0,0 +1,6 @@
+*.md
+bin/
+*.pb.go
+.git/
+.gitignore
+README.md
\ No newline at end of file
diff --git a/test/grpc-proxyless/.gitignore b/test/grpc-proxyless/.gitignore
new file mode 100644
index 00000000..db441f49
--- /dev/null
+++ b/test/grpc-proxyless/.gitignore
@@ -0,0 +1,2 @@
+bin/
+*.pb.go
diff --git a/test/grpc-proxyless/Dockerfile.consumer 
b/test/grpc-proxyless/Dockerfile.consumer
new file mode 100644
index 00000000..0ed7fd18
--- /dev/null
+++ b/test/grpc-proxyless/Dockerfile.consumer
@@ -0,0 +1,35 @@
+FROM golang:1.24-alpine AS builder
+
+WORKDIR /build
+
+RUN apk add --no-cache protobuf
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \
+    go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
+
+ENV PATH=$PATH:/go/bin:/root/go/bin
+
+COPY proto/echo.proto ./proto/
+RUN protoc --go_out=. --go_opt=paths=source_relative \
+    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+    proto/echo.proto
+
+COPY consumer/ ./consumer/
+
+ARG GOOS=linux
+ARG GOARCH=amd64
+RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -a -ldflags 
'-extldflags "-static"' -o /build/grpc-consumer ./consumer/
+
+FROM alpine:latest
+RUN apk update && \
+    apk --no-cache add ca-certificates tzdata || \
+    (sleep 2 && apk update && apk --no-cache add ca-certificates tzdata)
+WORKDIR /app
+
+COPY --from=builder /build/grpc-consumer /usr/local/bin/grpc-consumer
+RUN chmod +x /usr/local/bin/grpc-consumer
+
+ENTRYPOINT ["/usr/local/bin/grpc-consumer"]
diff --git a/test/grpc-proxyless/Dockerfile.producer 
b/test/grpc-proxyless/Dockerfile.producer
new file mode 100644
index 00000000..38add387
--- /dev/null
+++ b/test/grpc-proxyless/Dockerfile.producer
@@ -0,0 +1,35 @@
+FROM golang:1.24-alpine AS builder
+
+WORKDIR /build
+
+RUN apk add --no-cache protobuf
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \
+    go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
+
+ENV PATH=$PATH:/go/bin:/root/go/bin
+
+COPY proto/echo.proto ./proto/
+RUN protoc --go_out=. --go_opt=paths=source_relative \
+    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+    proto/echo.proto
+
+COPY producer/ ./producer/
+
+ARG GOOS=linux
+ARG GOARCH=amd64
+RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -a -ldflags 
'-extldflags "-static"' -o /build/grpc-producer ./producer/
+
+FROM alpine:latest
+RUN apk update && \
+    apk --no-cache add ca-certificates tzdata || \
+    (sleep 2 && apk update && apk --no-cache add ca-certificates tzdata)
+WORKDIR /app
+
+COPY --from=builder /build/grpc-producer /usr/local/bin/grpc-producer
+RUN chmod +x /usr/local/bin/grpc-producer
+
+ENTRYPOINT ["/usr/local/bin/grpc-producer"]
diff --git a/test/grpc-proxyless/README.md b/test/grpc-proxyless/README.md
new file mode 100644
index 00000000..fa268450
--- /dev/null
+++ b/test/grpc-proxyless/README.md
@@ -0,0 +1,12 @@
+# gRPC Proxyless Test Example
+
+This is a test example for gRPC proxyless service mesh based on [Istio's blog 
post](https://istio.io/latest/blog/2021/proxyless-grpc/).
+
+## Architecture
+
+- **Consumer**: gRPC server with xDS support (port 17070)
+- **Producer**: gRPC client with xDS support + test server (port 17171)
+
+Both services use `dubbo-proxy` sidecar as an xDS proxy to connect to the 
control plane. The sidecar runs an xDS proxy server that listens on a Unix 
Domain Socket (UDS) at `/etc/dubbo/proxy/XDS`. The gRPC applications connect to 
this xDS proxy via the UDS socket using the `GRPC_XDS_BOOTSTRAP` environment 
variable.
+
+**Note**: This is "proxyless" in the sense that the applications use native 
gRPC xDS clients instead of Envoy proxy for traffic routing. However, a 
lightweight sidecar (`dubbo-proxy`) is still used to proxy xDS API calls 
between the gRPC clients and the control plane.
diff --git a/test/grpc-proxyless/consumer/main.go 
b/test/grpc-proxyless/consumer/main.go
new file mode 100644
index 00000000..61ce9f4d
--- /dev/null
+++ b/test/grpc-proxyless/consumer/main.go
@@ -0,0 +1,135 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "log"
+       "net"
+       "os"
+       "os/signal"
+       "syscall"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       xdscreds "google.golang.org/grpc/credentials/xds"
+       "google.golang.org/grpc/reflection"
+       "google.golang.org/grpc/xds"
+
+       pb "github.com/apache/dubbo-kubernetes/test/grpc-proxyless/proto"
+)
+
+var (
+       port = flag.Int("port", 17070, "gRPC server port")
+)
+
+type echoServer struct {
+       pb.UnimplementedEchoServiceServer
+       pb.UnimplementedEchoTestServiceServer
+       hostname string
+}
+
+func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest) 
(*pb.EchoResponse, error) {
+       if req == nil {
+               return nil, fmt.Errorf("request is nil")
+       }
+       log.Printf("Received: %v", req.Message)
+       return &pb.EchoResponse{
+               Message:  req.Message,
+               Hostname: s.hostname,
+       }, nil
+}
+
+func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream 
pb.EchoService_StreamEchoServer) error {
+       if req == nil {
+               return fmt.Errorf("request is nil")
+       }
+       if stream == nil {
+               return fmt.Errorf("stream is nil")
+       }
+       log.Printf("StreamEcho received: %v", req.Message)
+       for i := 0; i < 3; i++ {
+               if err := stream.Send(&pb.EchoResponse{
+                       Message:  fmt.Sprintf("%s [%d]", req.Message, i),
+                       Hostname: s.hostname,
+               }); err != nil {
+                       log.Printf("StreamEcho send error: %v", err)
+                       return err
+               }
+       }
+       return nil
+}
+
+func (s *echoServer) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+       if req == nil {
+               return nil, fmt.Errorf("request is nil")
+       }
+
+       count := req.Count
+       if count < 0 {
+               count = 0
+       }
+       if count > 100 {
+               count = 100
+       }
+
+       log.Printf("ForwardEcho called: url=%s, count=%d", req.Url, count)
+
+       output := make([]string, 0, count)
+       for i := int32(0); i < count; i++ {
+               line := fmt.Sprintf("[%d body] Hostname=%s", i, s.hostname)
+               output = append(output, line)
+       }
+
+       return &pb.ForwardEchoResponse{
+               Output: output,
+       }, nil
+}
+
+func main() {
+       flag.Parse()
+
+       hostname, _ := os.Hostname()
+       if hostname == "" {
+               hostname = "unknown"
+       }
+
+       // Create xDS-enabled gRPC server
+       // For proxyless gRPC, we use xds.NewGRPCServer() instead of 
grpc.NewServer()
+       creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
+               FallbackCreds: insecure.NewCredentials(),
+       })
+       if err != nil {
+               log.Fatalf("Failed to create xDS server credentials: %v", err)
+       }
+
+       server, err := xds.NewGRPCServer(grpc.Creds(creds))
+       if err != nil {
+               log.Fatalf("Failed to create xDS gRPC server: %v", err)
+       }
+
+       es := &echoServer{hostname: hostname}
+       pb.RegisterEchoServiceServer(server, es)
+       pb.RegisterEchoTestServiceServer(server, es)
+       // Enable reflection API for grpcurl to discover services
+       reflection.Register(server)
+
+       lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
+       if err != nil {
+               log.Fatalf("Failed to listen: %v", err)
+       }
+
+       log.Printf("Starting gRPC proxyless server on port %d (hostname: %s)", 
*port, hostname)
+
+       go func() {
+               sigChan := make(chan os.Signal, 1)
+               signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+               <-sigChan
+               log.Println("Shutting down server...")
+               server.GracefulStop()
+       }()
+
+       if err := server.Serve(lis); err != nil {
+               log.Fatalf("Failed to serve: %v", err)
+       }
+}
diff --git a/test/grpc-proxyless/generate-proto.sh 
b/test/grpc-proxyless/generate-proto.sh
new file mode 100755
index 00000000..99ead4b3
--- /dev/null
+++ b/test/grpc-proxyless/generate-proto.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+set -e
+cd "$(dirname "$0")"
+
+# This script will be used when protoc is fixed
+# For now, proto files are generated in Dockerfile
+echo "Proto files are generated in Dockerfile during build"
+echo "To generate locally, fix protoc first:"
+echo "  brew reinstall protobuf abseil"
+echo ""
+echo "Then run:"
+echo "  protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. 
--go-grpc_opt=paths=source_relative proto/echo.proto"
diff --git a/test/grpc-proxyless/go.mod b/test/grpc-proxyless/go.mod
new file mode 100644
index 00000000..bef17d48
--- /dev/null
+++ b/test/grpc-proxyless/go.mod
@@ -0,0 +1,29 @@
+module github.com/apache/dubbo-kubernetes/test/grpc-proxyless
+
+go 1.24.0
+
+require (
+       google.golang.org/grpc v1.76.0
+       google.golang.org/protobuf v1.36.10
+)
+
+require (
+       cel.dev/expr v0.24.0 // indirect
+       cloud.google.com/go/compute/metadata v0.7.0 // indirect
+       github.com/cespare/xxhash/v2 v2.3.0 // indirect
+       github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
+       github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
+       github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
+       github.com/go-jose/go-jose/v4 v4.1.2 // indirect
+       github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 
// indirect
+       github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
+       github.com/zeebo/errs v1.4.0 // indirect
+       golang.org/x/crypto v0.43.0 // indirect
+       golang.org/x/net v0.46.0 // indirect
+       golang.org/x/oauth2 v0.30.0 // indirect
+       golang.org/x/sync v0.17.0 // indirect
+       golang.org/x/sys v0.37.0 // indirect
+       golang.org/x/text v0.30.0 // indirect
+       google.golang.org/genproto/googleapis/api 
v0.0.0-20250804133106-a7a43d27e69b // indirect
+       google.golang.org/genproto/googleapis/rpc 
v0.0.0-20251103181224-f26f9409b101 // indirect
+)
diff --git a/test/grpc-proxyless/go.sum b/test/grpc-proxyless/go.sum
new file mode 100644
index 00000000..a0ddf76c
--- /dev/null
+++ b/test/grpc-proxyless/go.sum
@@ -0,0 +1,76 @@
+cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
+cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
+cloud.google.com/go/compute/metadata v0.7.0 
h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU=
+cloud.google.com/go/compute/metadata v0.7.0/go.mod 
h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo=
+github.com/cespare/xxhash/v2 v2.3.0 
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 
h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod 
h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/envoyproxy/go-control-plane v0.13.4 
h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
+github.com/envoyproxy/go-control-plane v0.13.4/go.mod 
h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4 
h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod 
h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 
h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod 
h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
+github.com/envoyproxy/protoc-gen-validate v1.2.1 
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
+github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod 
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
+github.com/go-jose/go-jose/v4 v4.1.2 
h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI=
+github.com/go-jose/go-jose/v4 v4.1.2/go.mod 
h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo=
+github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
+github.com/go-logr/logr v1.4.3/go.mod 
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod 
h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/golang/protobuf v1.5.4 
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
+github.com/golang/protobuf v1.5.4/go.mod 
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod 
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 
h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod 
h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/spiffe/go-spiffe/v2 v2.5.0 
h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
+github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod 
h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
+github.com/stretchr/testify v1.10.0 
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
+github.com/zeebo/errs v1.4.0/go.mod 
h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
+go.opentelemetry.io/auto/sdk v1.1.0 
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
+go.opentelemetry.io/auto/sdk v1.1.0/go.mod 
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
+go.opentelemetry.io/otel v1.37.0 
h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
+go.opentelemetry.io/otel v1.37.0/go.mod 
h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
+go.opentelemetry.io/otel/metric v1.37.0 
h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
+go.opentelemetry.io/otel/metric v1.37.0/go.mod 
h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
+go.opentelemetry.io/otel/sdk v1.37.0 
h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
+go.opentelemetry.io/otel/sdk v1.37.0/go.mod 
h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
+go.opentelemetry.io/otel/sdk/metric v1.37.0 
h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
+go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod 
h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
+go.opentelemetry.io/otel/trace v1.37.0 
h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
+go.opentelemetry.io/otel/trace v1.37.0/go.mod 
h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
+golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
+golang.org/x/crypto v0.43.0/go.mod 
h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
+golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
+golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
+golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
+golang.org/x/oauth2 v0.30.0/go.mod 
h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
+golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
+golang.org/x/sync v0.17.0/go.mod 
h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
+golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
+golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
+golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
+golang.org/x/text v0.30.0/go.mod 
h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
+gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
+gonum.org/v1/gonum v0.16.0/go.mod 
h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
+google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b 
h1:ULiyYQ0FdsJhwwZUwbaXpZF5yUE3h+RA+gxvBu37ucc=
+google.golang.org/genproto/googleapis/api 
v0.0.0-20250804133106-a7a43d27e69b/go.mod 
h1:oDOGiMSXHL4sDTJvFvIB9nRQCGdLP1o/iVaqQK8zB+M=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 
h1:tRPGkdGHuewF4UisLzzHHr1spKw92qLM98nIzxbC0wY=
+google.golang.org/genproto/googleapis/rpc 
v0.0.0-20251103181224-f26f9409b101/go.mod 
h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
+google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
+google.golang.org/grpc v1.76.0/go.mod 
h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
+google.golang.org/protobuf v1.36.10 
h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
+google.golang.org/protobuf v1.36.10/go.mod 
h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/grpc-proxyless/producer/main.go 
b/test/grpc-proxyless/producer/main.go
new file mode 100644
index 00000000..b2284a20
--- /dev/null
+++ b/test/grpc-proxyless/producer/main.go
@@ -0,0 +1,401 @@
+package main
+
+import (
+       "context"
+       "encoding/json"
+       "flag"
+       "fmt"
+       "log"
+       "net"
+       "os"
+       "os/signal"
+       "strings"
+       "sync"
+       "syscall"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/connectivity"
+       "google.golang.org/grpc/credentials/insecure"
+       xdscreds "google.golang.org/grpc/credentials/xds"
+       "google.golang.org/grpc/reflection"
+       _ "google.golang.org/grpc/xds"
+
+       pb "github.com/apache/dubbo-kubernetes/test/grpc-proxyless/proto"
+)
+
+var (
+       target     = flag.String("target", "", "Target service address with 
xds:/// scheme (optional)")
+       count      = flag.Int("count", 5, "Number of requests to send")
+       port       = flag.Int("port", 17171, "gRPC server port for ForwardEcho 
testing")
+       testServer *grpc.Server
+)
+
+func main() {
+       flag.Parse()
+
+       go startTestServer(*port)
+
+       if *target != "" {
+               testDirectConnection(*target, *count)
+       }
+
+       log.Printf("Producer running. Test server listening on port %d for 
ForwardEcho", *port)
+
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+       <-sigChan
+       log.Println("Shutting down...")
+
+       if testServer != nil {
+               log.Println("Stopping test server...")
+               testServer.GracefulStop()
+       }
+}
+
+func testDirectConnection(target string, count int) {
+       creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+               FallbackCreds: insecure.NewCredentials(),
+       })
+       if err != nil {
+               log.Fatalf("Failed to create xDS client credentials: %v", err)
+       }
+
+       ctx := context.Background()
+       conn, err := grpc.DialContext(ctx, target, 
grpc.WithTransportCredentials(creds))
+       if err != nil {
+               log.Fatalf("Failed to connect: %v", err)
+       }
+       defer conn.Close()
+
+       client := pb.NewEchoServiceClient(conn)
+
+       log.Printf("Connected to %s, sending %d requests...", target, count)
+
+       for i := 0; i < count; i++ {
+               req := &pb.EchoRequest{
+                       Message: fmt.Sprintf("Hello from producer [%d]", i+1),
+               }
+
+               reqCtx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               resp, err := client.Echo(reqCtx, req)
+               cancel()
+
+               if err != nil {
+                       log.Printf("Request %d failed: %v", i+1, err)
+                       continue
+               }
+
+               if resp == nil {
+                       log.Printf("Request %d failed: response is nil", i+1)
+                       continue
+               }
+
+               log.Printf("Request %d: Response=%s, Hostname=%s", i+1, 
resp.Message, resp.Hostname)
+               time.Sleep(500 * time.Millisecond)
+       }
+
+       log.Println("All requests completed")
+}
+
+func startTestServer(port int) {
+       lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
+       if err != nil {
+               log.Printf("Failed to listen on port %d: %v", port, err)
+               return
+       }
+
+       testServer = grpc.NewServer()
+       pb.RegisterEchoTestServiceServer(testServer, &testServerImpl{
+               connCache: make(map[string]*grpc.ClientConn),
+       })
+       reflection.Register(testServer)
+
+       log.Printf("Test server listening on port %d for ForwardEcho 
(reflection enabled)", port)
+       if err := testServer.Serve(lis); err != nil {
+               log.Printf("Test server error: %v", err)
+       }
+}
+
+type testServerImpl struct {
+       pb.UnimplementedEchoTestServiceServer
+       // Connection cache: map from URL to gRPC connection
+       connCache map[string]*grpc.ClientConn
+       connMutex sync.RWMutex
+}
+
+func (s *testServerImpl) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+       if req == nil {
+               return nil, fmt.Errorf("request is nil")
+       }
+
+       if req.Url == "" {
+               return nil, fmt.Errorf("url is required")
+       }
+
+       count := req.Count
+       if count < 0 {
+               count = 0
+       }
+       if count > 100 {
+               count = 100
+       }
+
+       log.Printf("ForwardEcho: url=%s, count=%d", req.Url, count)
+
+       // Check bootstrap configuration
+       bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
+       if bootstrapPath == "" {
+               return nil, fmt.Errorf("GRPC_XDS_BOOTSTRAP environment variable 
is not set")
+       }
+
+       // Verify bootstrap file exists
+       if _, err := os.Stat(bootstrapPath); os.IsNotExist(err) {
+               return nil, fmt.Errorf("bootstrap file does not exist: %s", 
bootstrapPath)
+       }
+
+       // Read bootstrap file to verify UDS socket
+       bootstrapData, err := os.ReadFile(bootstrapPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to read bootstrap file: %v", err)
+       }
+
+       var bootstrapJSON map[string]interface{}
+       if err := json.Unmarshal(bootstrapData, &bootstrapJSON); err != nil {
+               return nil, fmt.Errorf("failed to parse bootstrap file: %v", 
err)
+       }
+
+       // Extract UDS socket path
+       var udsPath string
+       if xdsServers, ok := bootstrapJSON["xds_servers"].([]interface{}); ok 
&& len(xdsServers) > 0 {
+               if server, ok := xdsServers[0].(map[string]interface{}); ok {
+                       if serverURI, ok := server["server_uri"].(string); ok {
+                               if strings.HasPrefix(serverURI, "unix://") {
+                                       udsPath = strings.TrimPrefix(serverURI, 
"unix://")
+                                       if _, err := os.Stat(udsPath); 
os.IsNotExist(err) {
+                                               return nil, fmt.Errorf("UDS 
socket does not exist: %s", udsPath)
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // CRITICAL: Reuse connections to avoid creating new xDS connections 
for each RPC call
+       // This prevents the RDS request loop issue and ensures stable 
connection state
+       s.connMutex.RLock()
+       conn, exists := s.connCache[req.Url]
+       s.connMutex.RUnlock()
+
+       // Check if cached connection is still valid (not closed/shutdown)
+       if exists && conn != nil {
+               state := conn.GetState()
+               if state == connectivity.Shutdown {
+                       // Connection is closed, remove from cache and create 
new one
+                       log.Printf("ForwardEcho: cached connection for %s is 
SHUTDOWN, removing from cache", req.Url)
+                       s.connMutex.Lock()
+                       delete(s.connCache, req.Url)
+                       conn = nil
+                       exists = false
+                       s.connMutex.Unlock()
+               }
+       }
+
+       if !exists || conn == nil {
+               // Create new connection
+               s.connMutex.Lock()
+               // Double-check after acquiring write lock
+               if conn, exists = s.connCache[req.Url]; !exists || conn == nil {
+                       // Create xDS client credentials
+                       creds, err := 
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+                               FallbackCreds: insecure.NewCredentials(),
+                       })
+                       if err != nil {
+                               s.connMutex.Unlock()
+                               return nil, fmt.Errorf("failed to create xDS 
client credentials: %v", err)
+                       }
+
+                       // Dial with xDS URL - use background context, not the 
request context
+                       // The request context might timeout before xDS 
configuration is received
+                       log.Printf("ForwardEcho: creating new connection for 
%s...", req.Url)
+                       conn, err = grpc.DialContext(context.Background(), 
req.Url, grpc.WithTransportCredentials(creds))
+                       if err != nil {
+                               s.connMutex.Unlock()
+                               return nil, fmt.Errorf("failed to dial %s: %v", 
req.Url, err)
+                       }
+                       s.connCache[req.Url] = conn
+                       log.Printf("ForwardEcho: cached connection for %s", 
req.Url)
+               }
+               s.connMutex.Unlock()
+       } else {
+               log.Printf("ForwardEcho: reusing cached connection for %s 
(state: %v)", req.Url, conn.GetState())
+       }
+
+       initialState := conn.GetState()
+       log.Printf("ForwardEcho: initial connection state: %v", initialState)
+
+       // CRITICAL: If connection is already READY, use it directly without 
waiting
+       // For cached connections, they should already be in READY state
+       if initialState == connectivity.Ready {
+               log.Printf("ForwardEcho: connection is already READY, 
proceeding with RPC calls")
+       } else {
+               // Only wait for new connections or connections that are not 
READY
+               // For gRPC xDS proxyless, we need to wait for the client to 
receive and process LDS/CDS/EDS
+               // The connection state may transition: IDLE -> CONNECTING -> 
READY (or TRANSIENT_FAILURE -> CONNECTING -> READY)
+               log.Printf("ForwardEcho: waiting for xDS configuration to be 
processed and connection to be ready (30 seconds)...")
+
+               // Wait for state changes with multiple attempts
+               maxWait := 30 * time.Second
+
+               // Wait for state changes, allowing multiple state transitions
+               // CRITICAL: Don't exit on TRANSIENT_FAILURE - it may recover 
to READY
+               stateChanged := false
+               currentState := initialState
+               startTime := time.Now()
+               lastStateChangeTime := startTime
+
+               for time.Since(startTime) < maxWait {
+                       if currentState == connectivity.Ready {
+                               log.Printf("ForwardEcho: connection is READY 
after %v", time.Since(startTime))
+                               stateChanged = true
+                               break
+                       }
+
+                       // Only exit on Shutdown, not on TransientFailure (it 
may recover)
+                       if currentState == connectivity.Shutdown {
+                               log.Printf("ForwardEcho: connection in %v state 
after %v, cannot recover", currentState, time.Since(startTime))
+                               break
+                       }
+
+                       // Wait for state change with remaining timeout
+                       remaining := maxWait - time.Since(startTime)
+                       if remaining <= 0 {
+                               break
+                       }
+
+                       // Use shorter timeout for each WaitForStateChange call 
to allow periodic checks
+                       waitTimeout := remaining
+                       if waitTimeout > 5*time.Second {
+                               waitTimeout = 5 * time.Second
+                       }
+
+                       stateCtx, stateCancel := 
context.WithTimeout(context.Background(), waitTimeout)
+                       if conn.WaitForStateChange(stateCtx, currentState) {
+                               newState := conn.GetState()
+                               elapsed := time.Since(startTime)
+                               log.Printf("ForwardEcho: connection state 
changed from %v to %v after %v", currentState, newState, elapsed)
+                               stateChanged = true
+                               currentState = newState
+                               lastStateChangeTime = time.Now()
+
+                               // If READY, we're done
+                               if newState == connectivity.Ready {
+                                       stateCancel()
+                                       break
+                               }
+
+                               // If we're in TRANSIENT_FAILURE, continue 
waiting - it may recover
+                               // gRPC xDS client will retry connection when 
endpoints become available
+                               if newState == connectivity.TransientFailure {
+                                       log.Printf("ForwardEcho: connection in 
TRANSIENT_FAILURE, continuing to wait for recovery (remaining: %v)", 
maxWait-elapsed)
+                               }
+                       } else {
+                               // Timeout waiting for state change - check if 
we should continue
+                               elapsed := time.Since(startTime)
+                               if currentState == 
connectivity.TransientFailure {
+                                       // If we've been in TRANSIENT_FAILURE 
for a while, continue waiting
+                                       // The connection may recover when 
endpoints become available
+                                       if time.Since(lastStateChangeTime) < 
10*time.Second {
+                                               log.Printf("ForwardEcho: still 
in TRANSIENT_FAILURE after %v, continuing to wait (remaining: %v)", elapsed, 
maxWait-elapsed)
+                                       } else {
+                                               log.Printf("ForwardEcho: no 
state change after %v, current state: %v (remaining: %v)", elapsed, 
currentState, maxWait-elapsed)
+                                       }
+                               } else {
+                                       log.Printf("ForwardEcho: no state 
change after %v, current state: %v (remaining: %v)", elapsed, currentState, 
maxWait-elapsed)
+                               }
+                       }
+                       stateCancel()
+               }
+
+               finalState := conn.GetState()
+               log.Printf("ForwardEcho: final connection state: %v 
(stateChanged=%v, waited=%v)", finalState, stateChanged, time.Since(startTime))
+
+               // If connection is not READY, log a warning but proceed anyway
+               // The first RPC call may trigger connection establishment
+               if finalState != connectivity.Ready {
+                       log.Printf("ForwardEcho: WARNING - connection is not 
READY (state=%v), but proceeding with RPC calls", finalState)
+               }
+       }
+
+       // Create client and make RPC calls
+       client := pb.NewEchoServiceClient(conn)
+       output := make([]string, 0, count)
+
+       log.Printf("ForwardEcho: sending %d requests...", count)
+       for i := int32(0); i < count; i++ {
+               echoReq := &pb.EchoRequest{
+                       Message: fmt.Sprintf("Request %d", i+1),
+               }
+
+               currentState := conn.GetState()
+               log.Printf("ForwardEcho: sending request %d (connection state: 
%v)...", i+1, currentState)
+
+               // Use longer timeout for first request to allow connection 
establishment
+               // For subsequent requests, use shorter timeout but still allow 
for retries
+               timeout := 30 * time.Second
+               if i > 0 {
+                       timeout = 20 * time.Second
+               }
+
+               reqCtx, reqCancel := context.WithTimeout(context.Background(), 
timeout)
+               reqStartTime := time.Now()
+               resp, err := client.Echo(reqCtx, echoReq)
+               duration := time.Since(reqStartTime)
+               reqCancel()
+
+               // Check connection state after RPC call
+               stateAfterRPC := conn.GetState()
+               log.Printf("ForwardEcho: request %d completed in %v, connection 
state: %v (was %v)", i+1, duration, stateAfterRPC, currentState)
+
+               if err != nil {
+                       log.Printf("ForwardEcho: request %d failed: %v", i+1, 
err)
+                       output = append(output, fmt.Sprintf("[%d] Error: %v", 
i, err))
+
+                       // If connection is in TRANSIENT_FAILURE, wait a bit 
before next request
+                       // to allow gRPC client to retry and recover
+                       if stateAfterRPC == connectivity.TransientFailure && i 
< count-1 {
+                               waitTime := 2 * time.Second
+                               log.Printf("ForwardEcho: connection in 
TRANSIENT_FAILURE, waiting %v before next request...", waitTime)
+                               time.Sleep(waitTime)
+
+                               // Check if connection recovered
+                               newState := conn.GetState()
+                               if newState == connectivity.Ready {
+                                       log.Printf("ForwardEcho: connection 
recovered to READY after wait")
+                               } else {
+                                       log.Printf("ForwardEcho: connection 
state after wait: %v", newState)
+                               }
+                       }
+                       continue
+               }
+
+               if resp == nil {
+                       log.Printf("ForwardEcho: request %d failed: response is 
nil", i+1)
+                       output = append(output, fmt.Sprintf("[%d] Error: 
response is nil", i))
+                       continue
+               }
+
+               log.Printf("ForwardEcho: request %d succeeded: Hostname=%s", 
i+1, resp.Hostname)
+               output = append(output, fmt.Sprintf("[%d body] Hostname=%s", i, 
resp.Hostname))
+
+               // Small delay between successful requests to avoid 
overwhelming the server
+               if i < count-1 {
+                       time.Sleep(100 * time.Millisecond)
+               }
+       }
+
+       log.Printf("ForwardEcho: completed %d requests", count)
+
+       return &pb.ForwardEchoResponse{
+               Output: output,
+       }, nil
+}
diff --git a/test/grpc-proxyless/proto/echo.proto 
b/test/grpc-proxyless/proto/echo.proto
new file mode 100644
index 00000000..cb36e618
--- /dev/null
+++ b/test/grpc-proxyless/proto/echo.proto
@@ -0,0 +1,42 @@
+syntax = "proto3";
+
+package echo;
+
+option go_package = 
"github.com/apache/dubbo-kubernetes/test/grpc-proxyless/proto";
+
+// Echo service for testing gRPC proxyless
+service EchoService {
+  // Echo returns the same message sent
+  rpc Echo(EchoRequest) returns (EchoResponse);
+  
+  // StreamEcho streams back the same message
+  rpc StreamEcho(EchoRequest) returns (stream EchoResponse);
+}
+
+// EchoTestService for grpcurl testing (similar to Istio's echo service)
+service EchoTestService {
+  // ForwardEcho forwards a request to another service and returns the response
+  rpc ForwardEcho(ForwardEchoRequest) returns (ForwardEchoResponse);
+}
+
+message EchoRequest {
+  string message = 1;
+}
+
+message EchoResponse {
+  string message = 1;
+  string hostname = 2;
+}
+
+message ForwardEchoRequest {
+  string url = 1;  // Target URL (e.g., 
"xds:///consumer.grpc-proxyless.svc.cluster.local:7070")
+  int32 count = 2; // Number of requests to send
+  map<string, string> headers = 3;
+  int32 timeout = 4; // Timeout in seconds
+  bool h2 = 5; // Use HTTP/2
+  bool insecure = 6; // Use insecure connection
+}
+
+message ForwardEchoResponse {
+  repeated string output = 1; // Output lines from the requests
+}
diff --git a/test/grpc-proxyless/proto/gen.sh b/test/grpc-proxyless/proto/gen.sh
new file mode 100755
index 00000000..fc0bea40
--- /dev/null
+++ b/test/grpc-proxyless/proto/gen.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+set -e
+cd "$(dirname "$0")/.."
+export PATH=$PATH:$(go env GOPATH)/bin
+protoc --go_out=. --go_opt=paths=source_relative \
+       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+       proto/echo.proto
diff --git a/test/grpc-proxyless/test-commands.sh 
b/test/grpc-proxyless/test-commands.sh
new file mode 100755
index 00000000..d78b8d46
--- /dev/null
+++ b/test/grpc-proxyless/test-commands.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+set -e
+
+NAMESPACE="grpc-proxyless"
+
+echo "=== gRPC Proxyless 测试命令 ==="
+echo ""
+
+echo "=== 快速测试步骤 ==="
+echo ""
+echo "# 1. 重新构建 producer 镜像(代码已修改)"
+echo "cd test/grpc-proxyless"
+echo "docker build -f Dockerfile.producer -t mfordjody/grpc-producer:latest ."
+echo "docker push mfordjody/grpc-producer:latest  # 如果需要推送到 registry"
+echo ""
+echo "# 2. 重启 producer deployment"
+echo "kubectl rollout restart deployment/producer -n grpc-proxyless"
+echo "kubectl rollout status deployment/producer -n grpc-proxyless"
+echo ""
+echo "# 3. 等待 pod 就绪"
+echo "kubectl wait --for=condition=ready pod -l app=producer -n grpc-proxyless 
--timeout=60s"
+echo ""
+echo "# 4. 查看 producer 应用日志(观察连接状态和 RPC 调用)"
+echo "kubectl logs -f -l app=producer -c app -n grpc-proxyless --tail=100"
+echo ""
+echo "# 5. 查看 producer xDS proxy 日志(观察 EDS 响应)"
+echo "kubectl logs -f -l app=producer -c dubbo-proxy -n grpc-proxyless 
--tail=100 | grep -i 'xds\|eds\|connection'"
+echo ""
+echo "# 6. 测试 ForwardEcho(需要先启动 port-forward)"
+echo "# 在另一个终端执行:"
+echo "kubectl port-forward -n grpc-proxyless \$(kubectl get pod -l 
app=producer -n grpc-proxyless -o jsonpath='{.items[0].metadata.name}') 
17171:17171"
+echo ""
+echo "# 然后执行测试(注意:-d 参数必须在地址之前!):"
+echo ""
+echo "# 方法1: 直接使用 -d 参数(推荐)"
+echo "grpcurl -plaintext -d '{\"url\": 
\"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", \"count\": 5}' 
localhost:17171 echo.EchoTestService/ForwardEcho"
+echo ""
+echo "# 方法2: 使用标准输入(如果方法1不行)"
+echo "echo '{\"url\": 
\"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", \"count\": 5}' | 
grpcurl -plaintext -d @ localhost:17171 echo.EchoTestService/ForwardEcho"
+echo ""
+echo "# 方法3: 使用文件"
+echo "cat > /tmp/request.json << 'JSON'"
+echo "{\"url\": \"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", 
\"count\": 5}"
+echo "JSON"
+echo "grpcurl -plaintext -d @ /tmp/request.json localhost:17171 
echo.EchoTestService/ForwardEcho"
+echo ""
+echo "# 7. 查看控制平面日志(检查 EDS 推送)"
+echo "kubectl logs -f -l app=dubbod -n dubbo-system --tail=100 | grep -i 
'eds\|endpoint\|consumer'"
+echo ""
diff --git a/test/grpc-proxyless/test.sh b/test/grpc-proxyless/test.sh
new file mode 100755
index 00000000..1c20b2c9
--- /dev/null
+++ b/test/grpc-proxyless/test.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+set -e
+
+NAMESPACE="grpc-proxyless"
+
+echo "=== 测试 gRPC Proxyless 死循环修复 ==="
+echo ""
+
+# 1. 检查控制平面日志(确认死循环已解决)
+echo "1. 检查控制平面日志(观察 10 秒,确认不再出现死循环)..."
+echo "   运行: kubectl logs -f <dubbod-pod-name> -n <namespace> | grep -E 
'LDS.*PUSH|resources:'"
+echo "   期望: 不再出现 resources:1 和 resources:13 的快速交替"
+echo ""
+
+# 2. 检查 xDS proxy 日志
+echo "2. 检查 xDS proxy 日志(确认不再频繁转发)..."
+CONSUMER_POD=$(kubectl get pods -n $NAMESPACE -l app=consumer -o 
jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
+if [ -n "$CONSUMER_POD" ]; then
+    echo "   检查 pod: $CONSUMER_POD"
+    echo "   运行: kubectl logs $CONSUMER_POD -c dubbo-proxy -n $NAMESPACE | 
grep -E 'forwarding request.*LDS' | wc -l"
+    echo "   期望: 数量很少,不再频繁转发"
+else
+    echo "   未找到 consumer pod"
+fi
+echo ""
+
+# 3. 获取服务信息
+echo "3. 获取服务信息..."
+kubectl get svc -n $NAMESPACE
+echo ""
+
+# 4. 使用 grpcurl 测试
+echo "4. 使用 grpcurl 测试服务功能..."
+echo ""
+
+# 测试 ForwardEcho(通过 producer)
+PRODUCER_POD=$(kubectl get pods -n $NAMESPACE -l app=producer -o 
jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
+if [ -n "$PRODUCER_POD" ]; then
+    echo "   测试 ForwardEcho(通过 producer pod):"
+    echo "   kubectl port-forward -n $NAMESPACE $PRODUCER_POD 17171:17171 &"
+    echo "   grpcurl -plaintext localhost:17171 
echo.EchoTestService/ForwardEcho -d '{\"url\": 
\"xds:///consumer.grpc-proxyless.svc.cluster.local:7070\", \"count\": 1, 
\"headers\": {}, \"timeout\": 5}'"
+    echo ""
+fi
+
+# 测试 Echo(直接访问 consumer)
+CONSUMER_POD=$(kubectl get pods -n $NAMESPACE -l app=consumer -o 
jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
+if [ -n "$CONSUMER_POD" ]; then
+    echo "   测试 Echo(直接访问 consumer pod):"
+    echo "   kubectl port-forward -n $NAMESPACE $CONSUMER_POD 17070:17070 &"
+    echo "   grpcurl -plaintext localhost:17070 echo.EchoService/Echo -d 
'{\"message\": \"test\"}'"
+    echo ""
+fi
+
+echo "=== 测试完成 ==="
+
+

Reply via email to