This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b7b4698 Update grpc securty and traffic sample (#825)
1b7b4698 is described below

commit 1b7b469894c118b62fc0953d97cec4041affed2f
Author: mfordjody <[email protected]>
AuthorDate: Sat Nov 22 21:36:43 2025 +0800

    Update grpc securty and traffic sample (#825)
---
 dubbod/planet/pkg/bootstrap/server.go              |   9 +-
 dubbod/planet/pkg/model/authentication.go          |  98 ++-
 dubbod/planet/pkg/model/push_context.go            | 157 ++++-
 dubbod/planet/pkg/model/subsetrule.go              |  29 +-
 dubbod/planet/pkg/networking/grpcgen/cds.go        | 223 +++++-
 dubbod/planet/pkg/networking/grpcgen/grpcgen.go    |  33 +
 dubbod/planet/pkg/networking/grpcgen/lds.go        |  76 ++-
 dubbod/planet/pkg/xds/cds.go                       |  12 +
 manifests/charts/base/files/crd-all.yaml           |   8 +-
 .../dubbo-discovery/files/grpc-agent.yaml          |   9 +
 pkg/dubbo-agent/agent.go                           |  73 +-
 samples/grpc-app/README.md                         |  51 +-
 samples/grpc-app/grpc-app.yaml                     |  65 +-
 tests/grpc-app/README.md                           |   4 +-
 tests/grpc-app/consumer/main.go                    | 753 +++++++++++++++------
 tests/grpc-app/docker/dockerfile.consumer          |   3 +
 tests/grpc-app/producer/main.go                    | 574 ++++++----------
 17 files changed, 1476 insertions(+), 701 deletions(-)

diff --git a/dubbod/planet/pkg/bootstrap/server.go 
b/dubbod/planet/pkg/bootstrap/server.go
index aa363d4f..d567726f 100644
--- a/dubbod/planet/pkg/bootstrap/server.go
+++ b/dubbod/planet/pkg/bootstrap/server.go
@@ -482,10 +482,11 @@ func (s *Server) initRegistryEventHandlers() {
                // Log the config change
                log.Infof("configHandler: %s event for %s/%s/%s", event, 
configKey.Kind, configKey.Namespace, configKey.Name)
 
-               // CRITICAL: For SubsetRule and ServiceRoute changes, we need 
Full push to ensure
-               // PushContext is re-initialized and configuration is reloaded
-               // This is because these configs affect CDS/RDS generation and 
need complete context refresh
-               needsFullPush := configKind == kind.SubsetRule || configKind == 
kind.ServiceRoute
+               // CRITICAL: Some configs 
(SubsetRule/ServiceRoute/PeerAuthentication) require Full push to ensure
+               // PushContext is re-initialized and configuration is reloaded.
+               // PeerAuthentication must rebuild AuthenticationPolicies to 
enable STRICT mTLS on LDS; without
+               // a full push the cached PushContext would continue serving 
plaintext listeners.
+               needsFullPush := configKind == kind.SubsetRule || configKind == 
kind.ServiceRoute || configKind == kind.PeerAuthentication
 
                // Trigger ConfigUpdate to push changes to all connected proxies
                s.XDSServer.ConfigUpdate(&model.PushRequest{
diff --git a/dubbod/planet/pkg/model/authentication.go 
b/dubbod/planet/pkg/model/authentication.go
index c0deaaea..5fa8b7dc 100644
--- a/dubbod/planet/pkg/model/authentication.go
+++ b/dubbod/planet/pkg/model/authentication.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
        "istio.io/api/security/v1beta1"
+       typev1beta1 "istio.io/api/type/v1beta1"
 )
 
 type MutualTLSMode int
@@ -75,7 +76,8 @@ func (policy *AuthenticationPolicies) 
addPeerAuthentication(configs []config.Con
        for _, config := range configs {
                versions = append(versions, 
config.UID+"."+config.ResourceVersion)
                spec := config.Spec.(*v1beta1.PeerAuthentication)
-               if spec.Selector == nil || len(spec.Selector.MatchLabels) == 0 {
+               selector := spec.GetSelector()
+               if selector == nil || len(selector.MatchLabels) == 0 {
                        if t, ok := 
seenNamespaceOrMeshConfig[config.Namespace]; ok {
                                log.Warnf(
                                        "Namespace/mesh-level 
PeerAuthentication is already defined for %q at time %v. Ignore %q which was 
created at time %v",
@@ -132,3 +134,97 @@ func ConvertToMutualTLSMode(mode 
v1beta1.PeerAuthentication_MutualTLS_Mode) Mutu
                return MTLSUnknown
        }
 }
+
+func (policy *AuthenticationPolicies) EffectiveMutualTLSMode(namespace string, 
workloadLabels map[string]string, port uint32) MutualTLSMode {
+       if policy == nil {
+               return MTLSUnknown
+       }
+
+       if namespace != "" {
+               if mode := policy.matchingPeerAuthentication(namespace, 
workloadLabels, port); mode != MTLSUnknown {
+                       return mode
+               }
+               if mode, ok := policy.namespaceMutualTLSMode[namespace]; ok && 
mode != MTLSUnknown {
+                       return mode
+               }
+       }
+
+       if policy.rootNamespace != "" {
+               if mode := 
policy.matchingPeerAuthentication(policy.rootNamespace, workloadLabels, port); 
mode != MTLSUnknown {
+                       return mode
+               }
+       }
+
+       if policy.globalMutualTLSMode != MTLSUnknown {
+               return policy.globalMutualTLSMode
+       }
+
+       return MTLSUnknown
+}
+
+func (policy *AuthenticationPolicies) matchingPeerAuthentication(namespace 
string, workloadLabels map[string]string, port uint32) MutualTLSMode {
+       configs := policy.peerAuthentications[namespace]
+       if len(configs) == 0 {
+               return MTLSUnknown
+       }
+
+       for _, cfg := range configs {
+               spec := cfg.Spec.(*v1beta1.PeerAuthentication)
+               if hasPeerAuthSelector(spec) && 
selectorMatchesWorkload(spec.GetSelector(), workloadLabels) {
+                       if mode := peerAuthenticationModeForPort(spec, port); 
mode != MTLSUnknown {
+                               return mode
+                       }
+               }
+       }
+
+       for _, cfg := range configs {
+               spec := cfg.Spec.(*v1beta1.PeerAuthentication)
+               if !hasPeerAuthSelector(spec) {
+                       if mode := peerAuthenticationModeForPort(spec, port); 
mode != MTLSUnknown {
+                               return mode
+                       }
+               }
+       }
+
+       return MTLSUnknown
+}
+
+func hasPeerAuthSelector(spec *v1beta1.PeerAuthentication) bool {
+       if spec == nil {
+               return false
+       }
+       selector := spec.GetSelector()
+       return selector != nil && len(selector.MatchLabels) > 0
+}
+
+func selectorMatchesWorkload(selector *typev1beta1.WorkloadSelector, 
workloadLabels map[string]string) bool {
+       if selector == nil || len(selector.MatchLabels) == 0 {
+               return true
+       }
+       if len(workloadLabels) == 0 {
+               return false
+       }
+       for k, v := range selector.MatchLabels {
+               if workloadLabels[k] != v {
+                       return false
+               }
+       }
+       return true
+}
+
+func peerAuthenticationModeForPort(spec *v1beta1.PeerAuthentication, port 
uint32) MutualTLSMode {
+       if spec == nil {
+               return MTLSUnknown
+       }
+       if port != 0 && spec.PortLevelMtls != nil {
+               if mtls, ok := spec.PortLevelMtls[port]; ok && mtls != nil {
+                       if mtls.Mode != 
v1beta1.PeerAuthentication_MutualTLS_UNSET {
+                               return ConvertToMutualTLSMode(mtls.Mode)
+                       }
+               }
+       }
+       if spec.Mtls != nil && spec.Mtls.Mode != 
v1beta1.PeerAuthentication_MutualTLS_UNSET {
+               return ConvertToMutualTLSMode(spec.Mtls.Mode)
+       }
+       return MTLSUnknown
+}
diff --git a/dubbod/planet/pkg/model/push_context.go 
b/dubbod/planet/pkg/model/push_context.go
index 7990cd3b..572f035b 100644
--- a/dubbod/planet/pkg/model/push_context.go
+++ b/dubbod/planet/pkg/model/push_context.go
@@ -60,20 +60,21 @@ var (
 )
 
 type PushContext struct {
-       Mesh              *meshconfig.MeshConfig `json:"-"`
-       initializeMutex   sync.Mutex
-       InitDone          atomic.Bool
-       Networks          *meshconfig.MeshNetworks
-       networkMgr        *NetworkManager
-       clusterLocalHosts ClusterLocalHosts
-       exportToDefaults  exportToDefaults
-       ServiceIndex      serviceIndex
-       serviceRouteIndex serviceRouteIndex
-       subsetRuleIndex   subsetRuleIndex
-       serviceAccounts   map[serviceAccountKey][]string
-       PushVersion       string
-       ProxyStatus       map[string]map[string]ProxyPushStatus
-       proxyStatusMutex  sync.RWMutex
+       Mesh                   *meshconfig.MeshConfig `json:"-"`
+       initializeMutex        sync.Mutex
+       InitDone               atomic.Bool
+       Networks               *meshconfig.MeshNetworks
+       networkMgr             *NetworkManager
+       clusterLocalHosts      ClusterLocalHosts
+       exportToDefaults       exportToDefaults
+       ServiceIndex           serviceIndex
+       serviceRouteIndex      serviceRouteIndex
+       subsetRuleIndex        subsetRuleIndex
+       serviceAccounts        map[serviceAccountKey][]string
+       AuthenticationPolicies *AuthenticationPolicies
+       PushVersion            string
+       ProxyStatus            map[string]map[string]ProxyPushStatus
+       proxyStatusMutex       sync.RWMutex
 }
 
 type PushRequest struct {
@@ -515,6 +516,7 @@ func (ps *PushContext) createNewContext(env *Environment) {
        ps.initServiceRegistry(env, nil)
        ps.initServiceRoutes(env)
        ps.initSubsetRules(env)
+       ps.initAuthenticationPolicies(env)
 }
 
 func (ps *PushContext) updateContext(env *Environment, oldPushContext 
*PushContext, pushReq *PushRequest) {
@@ -606,6 +608,41 @@ func (ps *PushContext) updateContext(env *Environment, 
oldPushContext *PushConte
                ps.subsetRuleIndex = oldPushContext.subsetRuleIndex
        }
 
+       authnPoliciesChanged := pushReq != nil && (pushReq.Full || 
HasConfigsOfKind(pushReq.ConfigsUpdated, kind.PeerAuthentication))
+       if authnPoliciesChanged || oldPushContext == nil || 
oldPushContext.AuthenticationPolicies == nil {
+               log.Infof("updateContext: PeerAuthentication changed (full=%v, 
configsUpdatedContainingPeerAuth=%v), rebuilding authentication policies",
+                       pushReq != nil && pushReq.Full, func() bool {
+                               if pushReq == nil {
+                                       return false
+                               }
+                               return HasConfigsOfKind(pushReq.ConfigsUpdated, 
kind.PeerAuthentication)
+                       }())
+               ps.initAuthenticationPolicies(env)
+       } else {
+               ps.AuthenticationPolicies = 
oldPushContext.AuthenticationPolicies
+       }
+}
+
+func (ps *PushContext) initAuthenticationPolicies(env *Environment) {
+       if env == nil {
+               ps.AuthenticationPolicies = nil
+               return
+       }
+       ps.AuthenticationPolicies = initAuthenticationPolicies(env)
+}
+
+func (ps *PushContext) InboundMTLSModeForProxy(proxy *Proxy, port uint32) 
MutualTLSMode {
+       if ps == nil || proxy == nil || ps.AuthenticationPolicies == nil {
+               return MTLSUnknown
+       }
+       var namespace string
+       if proxy.Metadata != nil {
+               namespace = proxy.Metadata.Namespace
+       }
+       if namespace == "" {
+               namespace = proxy.ConfigNamespace
+       }
+       return ps.AuthenticationPolicies.EffectiveMutualTLSMode(namespace, nil, 
port)
 }
 
 func (ps *PushContext) ServiceForHostname(proxy *Proxy, hostname host.Name) 
*Service {
@@ -776,13 +813,22 @@ func (ps *PushContext) setSubsetRules(configs 
[]config.Config) {
        log.Infof("setSubsetRules: indexed %d namespaces with local rules", 
len(namespaceLocalSubRules))
        for ns, rules := range namespaceLocalSubRules {
                totalRules := 0
-               for _, ruleList := range rules.specificSubRules {
+               for hostname, ruleList := range rules.specificSubRules {
                        totalRules += len(ruleList)
+                       // Log TLS configuration for each merged DestinationRule
+                       for _, rule := range ruleList {
+                               if dr, ok := 
rule.rule.Spec.(*networking.DestinationRule); ok {
+                                       hasTLS := dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+                                       tlsMode := "none"
+                                       if hasTLS {
+                                               tlsMode = 
dr.TrafficPolicy.Tls.Mode.String()
+                                       }
+                                       log.Infof("setSubsetRules: namespace 
%s, hostname %s: DestinationRule has %d subsets, TLS mode: %s",
+                                               ns, hostname, len(dr.Subsets), 
tlsMode)
+                               }
+                       }
                }
                log.Infof("setSubsetRules: namespace %s has %d DestinationRules 
with %d specific hostnames", ns, totalRules, len(rules.specificSubRules))
-               for hostname := range rules.specificSubRules {
-                       log.Debugf("setSubsetRules: namespace %s has rules for 
hostname %s", ns, hostname)
-               }
        }
        log.Infof("setSubsetRules: indexed %d namespaces with exported rules", 
len(exportedDestRulesByNamespace))
        if rootNamespaceLocalDestRules != nil {
@@ -804,8 +850,12 @@ func (ps *PushContext) initSubsetRules(env *Environment) {
        for i := range subRules {
                subRules[i] = configs[i]
                if dr, ok := configs[i].Spec.(*networking.DestinationRule); ok {
-                       log.Infof("initSubsetRules: SubsetRule %s/%s for host 
%s with %d subsets",
-                               configs[i].Namespace, configs[i].Name, dr.Host, 
len(dr.Subsets))
+                       tlsMode := "none"
+                       if dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls != 
nil {
+                               tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+                       }
+                       log.Infof("initSubsetRules: SubsetRule %s/%s for host 
%s with %d subsets, TLS mode: %s",
+                               configs[i].Namespace, configs[i].Name, dr.Host, 
len(dr.Subsets), tlsMode)
                }
        }
 
@@ -863,7 +913,13 @@ func (ps *PushContext) DestinationRuleForService(namespace 
string, hostname host
        if nsRules := ps.subsetRuleIndex.namespaceLocal[namespace]; nsRules != 
nil {
                log.Debugf("DestinationRuleForService: checking namespace-local 
rules for %s (found %d specific rules)", namespace, 
len(nsRules.specificSubRules))
                if dr := firstDestinationRule(nsRules, hostname); dr != nil {
-                       log.Infof("DestinationRuleForService: found 
DestinationRule in namespace-local index for %s/%s with %d subsets", namespace, 
hostname, len(dr.Subsets))
+                       hasTLS := dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+                       tlsMode := "none"
+                       if hasTLS {
+                               tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+                       }
+                       log.Infof("DestinationRuleForService: found 
DestinationRule in namespace-local index for %s/%s with %d subsets (has 
TrafficPolicy: %v, has TLS: %v, TLS mode: %s)",
+                               namespace, hostname, len(dr.Subsets), 
dr.TrafficPolicy != nil, hasTLS, tlsMode)
                        return dr
                }
        } else {
@@ -874,7 +930,13 @@ func (ps *PushContext) DestinationRuleForService(namespace 
string, hostname host
        log.Debugf("DestinationRuleForService: checking exported rules (found 
%d exported namespaces)", len(ps.subsetRuleIndex.exportedByNamespace))
        for ns, exported := range ps.subsetRuleIndex.exportedByNamespace {
                if dr := firstDestinationRule(exported, hostname); dr != nil {
-                       log.Infof("DestinationRuleForService: found 
DestinationRule in exported rules from namespace %s for %s/%s with %d subsets", 
ns, namespace, hostname, len(dr.Subsets))
+                       hasTLS := dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+                       tlsMode := "none"
+                       if hasTLS {
+                               tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+                       }
+                       log.Infof("DestinationRuleForService: found 
DestinationRule in exported rules from namespace %s for %s/%s with %d subsets 
(has TrafficPolicy: %v, has TLS: %v, TLS mode: %s)",
+                               ns, namespace, hostname, len(dr.Subsets), 
dr.TrafficPolicy != nil, hasTLS, tlsMode)
                        return dr
                }
        }
@@ -883,7 +945,13 @@ func (ps *PushContext) DestinationRuleForService(namespace 
string, hostname host
        if rootRules := ps.subsetRuleIndex.rootNamespaceLocal; rootRules != nil 
{
                log.Debugf("DestinationRuleForService: checking root namespace 
rules (found %d specific rules)", len(rootRules.specificSubRules))
                if dr := firstDestinationRule(rootRules, hostname); dr != nil {
-                       log.Infof("DestinationRuleForService: found 
DestinationRule in root namespace for %s/%s with %d subsets", namespace, 
hostname, len(dr.Subsets))
+                       hasTLS := dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+                       tlsMode := "none"
+                       if hasTLS {
+                               tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+                       }
+                       log.Infof("DestinationRuleForService: found 
DestinationRule in root namespace for %s/%s with %d subsets (has TrafficPolicy: 
%v, has TLS: %v, TLS mode: %s)",
+                               namespace, hostname, len(dr.Subsets), 
dr.TrafficPolicy != nil, hasTLS, tlsMode)
                        return dr
                }
        }
@@ -915,12 +983,45 @@ func firstDestinationRule(csr *consolidatedSubRules, 
hostname host.Name) *networ
                return nil
        }
        if rules := csr.specificSubRules[hostname]; len(rules) > 0 {
-               log.Debugf("firstDestinationRule: found %d rules for hostname 
%s", len(rules), hostname)
-               if dr, ok := rules[0].rule.Spec.(*networking.DestinationRule); 
ok {
-                       log.Debugf("firstDestinationRule: successfully cast to 
DestinationRule for hostname %s", hostname)
-                       return dr
+               log.Infof("firstDestinationRule: found %d rules for hostname 
%s", len(rules), hostname)
+               // CRITICAL: According to Istio behavior, multiple 
DestinationRules should be merged into one.
+               // The first rule should contain the merged result if merge was 
successful.
+               // However, if merge failed (e.g., 
EnableEnhancedSubsetRuleMerge is disabled),
+               // we need to check all rules and prefer the one with TLS 
configuration.
+               // This ensures that when multiple SubsetRules exist (e.g., one 
with subsets, one with TLS),
+               // we return the one that has TLS if available, or the first 
one otherwise.
+               var bestRule *networking.DestinationRule
+               var bestRuleHasTLS bool
+               for i, rule := range rules {
+                       if dr, ok := 
rule.rule.Spec.(*networking.DestinationRule); ok {
+                               hasTLS := dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+                               if hasTLS {
+                                       tlsMode := dr.TrafficPolicy.Tls.Mode
+                                       tlsModeStr := 
dr.TrafficPolicy.Tls.Mode.String()
+                                       hasTLS = (tlsMode == 
networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
+                               }
+                               if i == 0 {
+                                       // Always use first rule as fallback
+                                       bestRule = dr
+                                       bestRuleHasTLS = hasTLS
+                               } else if hasTLS && !bestRuleHasTLS {
+                                       // Prefer rule with TLS over rule 
without TLS
+                                       log.Infof("firstDestinationRule: found 
rule %d with TLS for hostname %s, preferring it over rule 0", i, hostname)
+                                       bestRule = dr
+                                       bestRuleHasTLS = hasTLS
+                               }
+                       }
+               }
+               if bestRule != nil {
+                       tlsMode := "none"
+                       if bestRuleHasTLS {
+                               tlsMode = 
bestRule.TrafficPolicy.Tls.Mode.String()
+                       }
+                       log.Infof("firstDestinationRule: returning 
DestinationRule for hostname %s (has TrafficPolicy: %v, has TLS: %v, TLS mode: 
%s, has %d subsets)",
+                               hostname, bestRule.TrafficPolicy != nil, 
bestRuleHasTLS, tlsMode, len(bestRule.Subsets))
+                       return bestRule
                } else {
-                       log.Warnf("firstDestinationRule: failed to cast rule to 
DestinationRule for hostname %s", hostname)
+                       log.Warnf("firstDestinationRule: failed to cast any 
rule to DestinationRule for hostname %s", hostname)
                }
        } else {
                log.Debugf("firstDestinationRule: no specific rules found for 
hostname %s (available hostnames: %v)", hostname, func() []string {
diff --git a/dubbod/planet/pkg/model/subsetrule.go 
b/dubbod/planet/pkg/model/subsetrule.go
index 954b8cde..1b829318 100644
--- a/dubbod/planet/pkg/model/subsetrule.go
+++ b/dubbod/planet/pkg/model/subsetrule.go
@@ -41,6 +41,7 @@ func (ps *PushContext) mergeSubsetRule(p 
*consolidatedSubRules, subRuleConfig co
        }
 
        if mdrList, exists := subRules[resolvedHost]; exists {
+               log.Infof("mergeSubsetRule: found existing rules for host %s 
(count: %d)", resolvedHost, len(mdrList))
                // `appendSeparately` determines if the incoming destination 
rule would become a new unique entry in the processedDestRules list.
                appendSeparately := true
                for _, mdr := range mdrList {
@@ -72,6 +73,8 @@ func (ps *PushContext) mergeSubsetRule(p 
*consolidatedSubRules, subRuleConfig co
                        // at the same time added as a unique entry in the 
processedDestRules.
                        if bothWithoutSelector || (bothWithSelector && 
selectorsMatch) {
                                appendSeparately = false
+                               log.Debugf("mergeSubsetRule: will merge rules 
for host %s (bothWithoutSelector: %v, bothWithSelector: %v, selectorsMatch: 
%v)",
+                                       resolvedHost, bothWithoutSelector, 
bothWithSelector, selectorsMatch)
                        }
 
                        // Deep copy destination rule, to prevent mutate it 
later when merge with a new one.
@@ -96,10 +99,28 @@ func (ps *PushContext) mergeSubsetRule(p 
*consolidatedSubRules, subRuleConfig co
                                }
                        }
 
-                       // If there is no top level policy and the incoming 
rule has top level
-                       // traffic policy, use the one from the incoming rule.
-                       if mergedRule.TrafficPolicy == nil && 
rule.TrafficPolicy != nil {
-                               mergedRule.TrafficPolicy = rule.TrafficPolicy
+                       // Merge top-level traffic policy. Historically we only 
copied the first non-nil policy,
+                       // which meant a later SubsetRule that supplied TLS 
settings was ignored once a prior
+                       // rule (e.g. subsets only) existed. To match Istio's 
behavior and ensure Proxyless gRPC
+                       // can enable mTLS after subsets are defined, allow the 
incoming rule to override the TLS
+                       // portion even when a Common TrafficPolicy already 
exists.
+                       if rule.TrafficPolicy != nil {
+                               if mergedRule.TrafficPolicy == nil {
+                                       // First rule with TrafficPolicy, copy 
it entirely
+                                       mergedRule.TrafficPolicy = 
rule.TrafficPolicy
+                                       log.Infof("mergeSubsetRule: copied 
TrafficPolicy from new rule to merged rule for host %s (has TLS: %v)",
+                                               resolvedHost, 
rule.TrafficPolicy.Tls != nil)
+                               } else {
+                                       // Merge TrafficPolicy fields, with TLS 
settings from the latest rule taking precedence
+                                       if rule.TrafficPolicy.Tls != nil {
+                                               // CRITICAL: TLS settings from 
the latest rule always win (ISTIO_MUTUAL/DUBBO_MUTUAL)
+                                               mergedRule.TrafficPolicy.Tls = 
rule.TrafficPolicy.Tls
+                                               log.Infof("mergeSubsetRule: 
updated TLS settings in merged TrafficPolicy for host %s (mode: %v)",
+                                                       resolvedHost, 
rule.TrafficPolicy.Tls.Mode)
+                                       }
+                                       // Merge other TrafficPolicy fields if 
needed (loadBalancer, connectionPool, etc.)
+                                       // For now, we only merge TLS as it's 
the critical setting for mTLS
+                               }
                        }
                }
                if appendSeparately {
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go 
b/dubbod/planet/pkg/networking/grpcgen/cds.go
index 3efff2bb..835c27a5 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
 
 import (
        "fmt"
+       "strings"
 
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -29,6 +30,8 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+       tlsv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
+       networking "istio.io/api/networking/v1alpha3"
 )
 
 type clusterBuilder struct {
@@ -115,7 +118,29 @@ func newClusterBuilder(node *model.Proxy, push 
*model.PushContext, defaultCluste
 func (b *clusterBuilder) build() []*cluster.Cluster {
        var defaultCluster *cluster.Cluster
        defaultRequested := b.filter == nil || 
b.filter.Contains(b.defaultClusterName)
-       if defaultRequested {
+
+       // CRITICAL: Check if DestinationRule has TLS configuration before 
generating default cluster
+       // According to Istio's proxyless gRPC implementation:
+       // - DestinationRule with ISTIO_MUTUAL configures CLIENT-SIDE 
(outbound) mTLS
+       // - If DestinationRule specifies ISTIO_MUTUAL, we MUST generate 
default cluster and apply TLS
+       //   even if it's not explicitly requested, so that clients can use 
mTLS when connecting
+       // Reference: 
https://istio.io/latest/blog/2021/proxyless-grpc/#enabling-mtls
+       var dr *networking.DestinationRule
+       if b.svc != nil {
+               dr = 
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.hostname)
+               if dr == nil && b.svc.Hostname != b.hostname {
+                       dr = 
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
+               }
+       }
+       hasTLSInDR := dr != nil && dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+       if hasTLSInDR {
+               tlsMode := dr.TrafficPolicy.Tls.Mode
+               tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
+               hasTLSInDR = (tlsMode == 
networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
+       }
+
+       // Generate default cluster if requested OR if DestinationRule has 
ISTIO_MUTUAL TLS
+       if defaultRequested || hasTLSInDR {
                defaultCluster = b.edsCluster(b.defaultClusterName)
                // CRITICAL: For gRPC proxyless, we need to set CommonLbConfig 
to handle endpoint health status
                // Following Istio's implementation, we should include 
UNHEALTHY and DRAINING endpoints
@@ -138,10 +163,20 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
                                core.HealthStatus_DEGRADED,
                        },
                }
-               log.Infof("clusterBuilder.build: generated default cluster %s", 
b.defaultClusterName)
+               // TLS will be applied in applyDestinationRule after 
DestinationRule is found
+               if hasTLSInDR {
+                       log.Infof("clusterBuilder.build: generated default 
cluster %s (required for ISTIO_MUTUAL TLS)", b.defaultClusterName)
+               } else {
+                       log.Infof("clusterBuilder.build: generated default 
cluster %s", b.defaultClusterName)
+               }
        }
 
-       subsetClusters := b.applyDestinationRule(defaultCluster)
+       subsetClusters, newDefaultCluster := 
b.applyDestinationRule(defaultCluster)
+       // If applyDestinationRule generated a new default cluster (because TLS 
was found but cluster wasn't generated in build()),
+       // use it instead of the original defaultCluster
+       if newDefaultCluster != nil {
+               defaultCluster = newDefaultCluster
+       }
        out := make([]*cluster.Cluster, 0, 1+len(subsetClusters))
        if defaultCluster != nil {
                out = append(out, defaultCluster)
@@ -171,25 +206,84 @@ func (b *clusterBuilder) edsCluster(name string) 
*cluster.Cluster {
        }
 }
 
-func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) 
(subsetClusters []*cluster.Cluster) {
+func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) 
(subsetClusters []*cluster.Cluster, newDefaultCluster *cluster.Cluster) {
        if b.svc == nil || b.port == nil {
                log.Warnf("applyDestinationRule: service or port is nil for 
%s", b.defaultClusterName)
-               return nil
+               return nil, nil
        }
        log.Infof("applyDestinationRule: looking for DestinationRule for 
service %s/%s (hostname=%s, port=%d)",
                b.svc.Attributes.Namespace, b.svc.Attributes.Name, b.hostname, 
b.portNum)
        dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace, 
b.hostname)
        if dr == nil {
-               log.Warnf("applyDestinationRule: no DestinationRule found for 
%s/%s", b.svc.Attributes.Namespace, b.hostname)
-               return nil
+               // If not found with b.hostname, try with the service's FQDN 
hostname
+               if b.svc.Hostname != b.hostname {
+                       dr = 
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
+               }
+               if dr == nil {
+                       log.Warnf("applyDestinationRule: no DestinationRule 
found for %s/%s or %s", b.svc.Attributes.Namespace, b.hostname, b.svc.Hostname)
+                       return nil, nil
+               }
        }
-       if len(dr.Subsets) == 0 {
-               log.Warnf("applyDestinationRule: DestinationRule found for 
%s/%s but has no subsets", b.svc.Attributes.Namespace, b.hostname)
-               return nil
+
+       // Check if DestinationRule has TLS configuration
+       hasTLS := dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls != nil
+       if hasTLS {
+               tlsMode := dr.TrafficPolicy.Tls.Mode
+               tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
+               hasTLS = (tlsMode == networking.ClientTLSSettings_ISTIO_MUTUAL 
|| tlsModeStr == "DUBBO_MUTUAL")
+       }
+
+       // If no subsets and no TLS, there's nothing to do
+       if len(dr.Subsets) == 0 && !hasTLS {
+               log.Warnf("applyDestinationRule: DestinationRule found for 
%s/%s but has no subsets and no TLS policy", b.svc.Attributes.Namespace, 
b.hostname)
+               return nil, nil
        }
 
-       log.Infof("applyDestinationRule: found DestinationRule for %s/%s with 
%d subsets, defaultCluster requested=%v",
-               b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets), 
defaultCluster != nil)
+       log.Infof("applyDestinationRule: found DestinationRule for %s/%s with 
%d subsets, defaultCluster requested=%v, hasTLS=%v",
+               b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets), 
defaultCluster != nil, hasTLS)
+
+       // CRITICAL: Apply TLS to default cluster if it exists and doesn't have 
TransportSocket yet
+       // This ensures that default cluster gets TLS from the top-level 
TrafficPolicy in DestinationRule
+       // When SubsetRule sets ISTIO_MUTUAL, inbound listener enforces STRICT 
mTLS, so outbound must also use TLS
+       // NOTE: We re-check hasTLS here because firstDestinationRule might 
have returned a different rule
+       // than the one checked in build(), especially when multiple 
SubsetRules exist and merge failed
+       if defaultCluster != nil && defaultCluster.TransportSocket == nil {
+               // Re-check TLS in case DestinationRule was found here but not 
in build()
+               recheckTLS := dr != nil && dr.TrafficPolicy != nil && 
dr.TrafficPolicy.Tls != nil
+               if recheckTLS {
+                       tlsMode := dr.TrafficPolicy.Tls.Mode
+                       tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
+                       recheckTLS = (tlsMode == 
networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
+               }
+               if hasTLS || recheckTLS {
+                       log.Infof("applyDestinationRule: applying TLS to 
default cluster %s (DestinationRule has ISTIO_MUTUAL)", b.defaultClusterName)
+                       b.applyTLSForCluster(defaultCluster, nil)
+               } else {
+                       log.Debugf("applyDestinationRule: skipping TLS for 
default cluster %s (DestinationRule has no TrafficPolicy or TLS)", 
b.defaultClusterName)
+               }
+       } else if defaultCluster == nil && hasTLS {
+               // CRITICAL: If default cluster was not generated in build() 
but DestinationRule has TLS,
+               // we need to generate it here to ensure TLS is applied
+               // This can happen if build() checked the first rule (without 
TLS) but applyDestinationRule
+               // found a different rule (with TLS) via firstDestinationRule's 
improved logic
+               log.Warnf("applyDestinationRule: default cluster was not 
generated in build() but DestinationRule has TLS, generating it now")
+               defaultCluster = b.edsCluster(b.defaultClusterName)
+               if defaultCluster.CommonLbConfig == nil {
+                       defaultCluster.CommonLbConfig = 
&cluster.Cluster_CommonLbConfig{}
+               }
+               defaultCluster.CommonLbConfig.OverrideHostStatus = 
&core.HealthStatusSet{
+                       Statuses: []core.HealthStatus{
+                               core.HealthStatus_HEALTHY,
+                               core.HealthStatus_UNHEALTHY,
+                               core.HealthStatus_DRAINING,
+                               core.HealthStatus_UNKNOWN,
+                               core.HealthStatus_DEGRADED,
+                       },
+               }
+               log.Infof("applyDestinationRule: applying TLS to newly 
generated default cluster %s (DestinationRule has ISTIO_MUTUAL)", 
b.defaultClusterName)
+               b.applyTLSForCluster(defaultCluster, nil)
+               return nil, defaultCluster // Return the newly generated 
default cluster
+       }
 
        var commonLbConfig *cluster.Cluster_CommonLbConfig
        if defaultCluster != nil {
@@ -235,9 +329,112 @@ func (b *clusterBuilder) 
applyDestinationRule(defaultCluster *cluster.Cluster) (
                log.Infof("applyDestinationRule: generating subset cluster %s 
for subset %s", clusterName, subset.Name)
                subsetCluster := b.edsCluster(clusterName)
                subsetCluster.CommonLbConfig = commonLbConfig
+               b.applyTLSForCluster(subsetCluster, subset)
                subsetClusters = append(subsetClusters, subsetCluster)
        }
 
        log.Infof("applyDestinationRule: generated %d subset clusters for 
%s/%s", len(subsetClusters), b.svc.Attributes.Namespace, b.hostname)
-       return subsetClusters
+       return subsetClusters, nil
+}
+
+// applyTLSForCluster attaches a gRPC-compatible TLS transport socket whenever 
the
+// DestinationRule (or subset override) specifies ISTIO_MUTUAL/DUBBO_MUTUAL 
mode.
+func (b *clusterBuilder) applyTLSForCluster(c *cluster.Cluster, subset 
*networking.Subset) {
+       if c == nil || b.svc == nil {
+               return
+       }
+
+       dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace, 
b.hostname)
+       if dr == nil && b.svc.Hostname != b.hostname {
+               // If not found with b.hostname, try with the service's FQDN 
hostname
+               dr = 
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
+       }
+       if dr == nil {
+               log.Warnf("applyTLSForCluster: no DestinationRule found for 
cluster %s (namespace=%s, hostname=%s, service hostname=%s)",
+                       c.Name, b.svc.Attributes.Namespace, b.hostname, 
b.svc.Hostname)
+               return
+       }
+
+       var policy *networking.TrafficPolicy
+       if subset != nil && subset.TrafficPolicy != nil {
+               policy = subset.TrafficPolicy
+               log.Infof("applyTLSForCluster: using TrafficPolicy from subset 
%s for cluster %s", subset.Name, c.Name)
+       } else {
+               policy = dr.TrafficPolicy
+               if policy != nil {
+                       log.Infof("applyTLSForCluster: using top-level 
TrafficPolicy for cluster %s", c.Name)
+               }
+       }
+
+       if policy == nil || policy.Tls == nil {
+               if policy == nil {
+                       log.Warnf("applyTLSForCluster: no TrafficPolicy found 
in DestinationRule for cluster %s", c.Name)
+               } else {
+                       log.Warnf("applyTLSForCluster: no TLS settings in 
TrafficPolicy for cluster %s", c.Name)
+               }
+               return
+       }
+
+       mode := policy.Tls.Mode
+       modeStr := policy.Tls.Mode.String()
+       if mode != networking.ClientTLSSettings_ISTIO_MUTUAL && modeStr != 
"DUBBO_MUTUAL" {
+               log.Debugf("applyTLSForCluster: TLS mode %v (%s) not supported 
for gRPC proxyless, skipping", mode, modeStr)
+               return
+       }
+
+       tlsContext := b.buildUpstreamTLSContext(c, policy.Tls)
+       if tlsContext == nil {
+               log.Warnf("applyTLSForCluster: failed to build TLS context for 
cluster %s", c.Name)
+               return
+       }
+
+       // Log SNI configuration for debugging
+       sni := tlsContext.Sni
+       if sni == "" {
+               log.Warnf("applyTLSForCluster: SNI is empty for cluster %s, 
this may cause TLS handshake failures", c.Name)
+       } else {
+               log.Debugf("applyTLSForCluster: using SNI=%s for cluster %s", 
sni, c.Name)
+       }
+
+       c.TransportSocket = &core.TransportSocket{
+               Name:       "envoy.transport_sockets.tls",
+               ConfigType: &core.TransportSocket_TypedConfig{TypedConfig: 
protoconv.MessageToAny(tlsContext)},
+       }
+       log.Infof("applyTLSForCluster: applied %v TLS transport socket to 
cluster %s (SNI=%s)", mode, c.Name, sni)
+}
+
+// buildUpstreamTLSContext builds an UpstreamTlsContext that conforms to gRPC 
xDS expectations,
+// reusing the common certificate-provider setup from buildCommonTLSContext.
+func (b *clusterBuilder) buildUpstreamTLSContext(c *cluster.Cluster, 
tlsSettings *networking.ClientTLSSettings) *tlsv3.UpstreamTlsContext {
+       common := buildCommonTLSContext()
+       if common == nil {
+               return nil
+       }
+
+       tlsContext := &tlsv3.UpstreamTlsContext{
+               CommonTlsContext: common,
+               Sni:              tlsSettings.GetSni(),
+       }
+       // CRITICAL: SNI must be the service hostname, not the cluster name
+       // Cluster name format: outbound|port|subset|hostname
+       // We need to extract the hostname from the cluster name or use the 
service hostname
+       if tlsContext.Sni == "" {
+               if b.svc != nil && b.svc.Hostname != "" {
+                       tlsContext.Sni = string(b.svc.Hostname)
+               } else {
+                       // Fallback: try to extract hostname from cluster name
+                       // Cluster name format: outbound|port|subset|hostname
+                       parts := strings.Split(c.Name, "|")
+                       if len(parts) >= 4 {
+                               tlsContext.Sni = parts[3]
+                       } else {
+                               // Last resort: use cluster name (not ideal but 
better than empty)
+                               tlsContext.Sni = c.Name
+                               log.Warnf("buildUpstreamTLSContext: using 
cluster name as SNI fallback for %s (should be service hostname)", c.Name)
+                       }
+               }
+       }
+       // Proxyless gRPC always speaks HTTP/2, advertise h2 via ALPN.
+       tlsContext.CommonTlsContext.AlpnProtocols = []string{"h2"}
+       return tlsContext
 }
diff --git a/dubbod/planet/pkg/networking/grpcgen/grpcgen.go 
b/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
index c4544aa0..f09cd3d4 100644
--- a/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
+++ b/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
@@ -21,6 +21,7 @@ import (
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
        dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
+       tlsv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
 )
 
 var log = dubbolog.RegisterScope("grpcgen", "xDS Generator for Proxyless gRPC")
@@ -51,3 +52,35 @@ func (g *GrpcConfigGenerator) Generate(proxy *model.Proxy, w 
*model.WatchedResou
 
        return nil, model.DefaultXdsLogDetails, nil
 }
+
+// buildCommonTLSContext creates a TLS context that matches gRPC xDS 
expectations.
+// It is adapted from Istio's buildCommonTLSContext implementation, but kept 
minimal:
+// - Uses certificate provider "default" for workload certs and root CA
+// - Does not configure explicit SAN matches (left to future hardening)
+func buildCommonTLSContext() *tlsv3.CommonTlsContext {
+       return &tlsv3.CommonTlsContext{
+               // Workload certificate provider instance (SPIFFE workload cert 
chain)
+               TlsCertificateCertificateProviderInstance: 
&tlsv3.CommonTlsContext_CertificateProviderInstance{
+                       InstanceName:    "default",
+                       CertificateName: "default",
+               },
+               // Root CA provider instance
+               ValidationContextType: 
&tlsv3.CommonTlsContext_CombinedValidationContext{
+                       CombinedValidationContext: 
&tlsv3.CommonTlsContext_CombinedCertificateValidationContext{
+                               ValidationContextCertificateProviderInstance: 
&tlsv3.CommonTlsContext_CertificateProviderInstance{
+                                       InstanceName:    "default",
+                                       CertificateName: "ROOTCA",
+                               },
+                               // DefaultValidationContext: Configure basic 
certificate validation
+                               // The certificate provider instance (ROOTCA) 
provides the root CA for validation
+                               // For gRPC proxyless, we rely on the 
certificate provider for root CA validation
+                               // SAN matching can be added later if needed 
for stricter validation
+                               DefaultValidationContext: 
&tlsv3.CertificateValidationContext{
+                                       // Trust the root CA from the 
certificate provider
+                                       // The certificate provider instance 
"default" with "ROOTCA" will provide
+                                       // the root CA certificates for 
validating peer certificates
+                               },
+                       },
+               },
+       }
+}
diff --git a/dubbod/planet/pkg/networking/grpcgen/lds.go 
b/dubbod/planet/pkg/networking/grpcgen/lds.go
index bd98e07f..015681ec 100644
--- a/dubbod/planet/pkg/networking/grpcgen/lds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/lds.go
@@ -35,7 +35,9 @@ import (
        route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
        routerv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
        hcmv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
+       tlsv3 
"github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+       wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
 )
 
 type listenerNames map[string]listenerName
@@ -197,6 +199,16 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
                        continue
                }
 
+               // According to Istio's proxyless gRPC implementation:
+               // - DestinationRule with ISTIO_MUTUAL only configures 
CLIENT-SIDE (outbound) mTLS
+               // - PeerAuthentication with STRICT configures SERVER-SIDE 
(inbound) mTLS
+               // Both are REQUIRED for mTLS to work. Server-side mTLS should 
ONLY be controlled by PeerAuthentication.
+               // Reference: 
https://istio.io/latest/blog/2021/proxyless-grpc/#enabling-mtls
+               mode := push.InboundMTLSModeForProxy(node, uint32(listenPort))
+               if mode == model.MTLSPermissive {
+                       log.Warnf("buildInboundListeners: PERMISSIVE mTLS is 
not supported for proxyless gRPC; defaulting to plaintext on listener %s", name)
+               }
+
                // For proxyless gRPC inbound listeners, we need a FilterChain 
with HttpConnectionManager filter
                // to satisfy gRPC client requirements. According to grpc-go 
issue #7691 and the error
                // "missing HttpConnectionManager filter", gRPC proxyless 
clients require HttpConnectionManager
@@ -238,6 +250,32 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
                        },
                }
 
+               filterChain := &listener.FilterChain{
+                       Filters: []*listener.Filter{
+                               {
+                                       Name: wellknown.HTTPConnectionManager,
+                                       ConfigType: 
&listener.Filter_TypedConfig{
+                                               TypedConfig: 
protoconv.MessageToAny(hcm),
+                                       },
+                               },
+                       },
+               }
+
+               if ts := buildDownstreamTransportSocket(mode); ts != nil {
+                       // For STRICT mTLS mode, set TransportSocket to require 
TLS connections
+                       // When TransportSocket is present, only TLS 
connections can match this FilterChain
+                       // No FilterChainMatch needed - gRPC proxyless will 
automatically match based on TransportSocket presence
+                       filterChain.TransportSocket = ts
+                       log.Infof("buildInboundListeners: applied STRICT mTLS 
transport socket to listener %s (mode=%v, requires client cert=true)", name, 
mode)
+               } else if mode == model.MTLSStrict {
+                       log.Warnf("buildInboundListeners: expected to enable 
STRICT mTLS on listener %s but failed to build transport socket (mode=%v)", 
name, mode)
+               } else {
+                       // For plaintext mode, no TransportSocket means only 
plaintext connections can match
+                       // No FilterChainMatch needed - gRPC proxyless will 
automatically match based on TransportSocket absence
+                       // TLS connections will fail to match this FilterChain 
(no TransportSocket) and connection will fail
+                       log.Debugf("buildInboundListeners: listener %s using 
plaintext (mode=%v) - clients using TLS will fail to connect", name, mode)
+               }
+
                ll := &listener.Listener{
                        Name: name,
                        Address: &core.Address{Address: 
&core.Address_SocketAddress{
@@ -249,18 +287,7 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
                                },
                        }},
                        // Create FilterChain with HttpConnectionManager filter 
for proxyless gRPC
-                       FilterChains: []*listener.FilterChain{
-                               {
-                                       Filters: []*listener.Filter{
-                                               {
-                                                       Name: 
wellknown.HTTPConnectionManager,
-                                                       ConfigType: 
&listener.Filter_TypedConfig{
-                                                               TypedConfig: 
protoconv.MessageToAny(hcm),
-                                                       },
-                                               },
-                                       },
-                               },
-                       },
+                       FilterChains: []*listener.FilterChain{filterChain},
                        // the following must not be set or the client will NACK
                        ListenerFilters: nil,
                        UseOriginalDst:  nil,
@@ -279,6 +306,31 @@ func buildInboundListeners(node *model.Proxy, push 
*model.PushContext, names []s
        return out
 }
 
+func buildDownstreamTransportSocket(mode model.MutualTLSMode) 
*core.TransportSocket {
+       if mode != model.MTLSStrict {
+               return nil
+       }
+       common := buildCommonTLSContext()
+       if common == nil {
+               return nil
+       }
+       common.AlpnProtocols = []string{"h2"}
+
+       // For STRICT mTLS, we require client certificates and validate them
+       // The validation context is already configured in buildCommonTLSContext
+       // via the certificate provider instance (ROOTCA)
+       tlsContext := &tlsv3.DownstreamTlsContext{
+               CommonTlsContext:         common,
+               RequireClientCertificate: wrapperspb.Bool(true),
+               // Note: gRPC proxyless uses certificate provider for validation
+               // The ValidationContextType in CommonTlsContext handles client 
cert validation
+       }
+       return &core.TransportSocket{
+               Name:       "envoy.transport_sockets.tls",
+               ConfigType: &core.TransportSocket_TypedConfig{TypedConfig: 
protoconv.MessageToAny(tlsContext)},
+       }
+}
+
 func buildOutboundListeners(node *model.Proxy, push *model.PushContext, filter 
listenerNames) model.Resources {
        out := make(model.Resources, 0, len(filter))
 
diff --git a/dubbod/planet/pkg/xds/cds.go b/dubbod/planet/pkg/xds/cds.go
index 621e1e21..1cc35be7 100644
--- a/dubbod/planet/pkg/xds/cds.go
+++ b/dubbod/planet/pkg/xds/cds.go
@@ -20,6 +20,7 @@ package xds
 import (
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/core"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
 )
 
 type CdsGenerator struct {
@@ -33,6 +34,17 @@ func cdsNeedsPush(req *model.PushRequest, proxy 
*model.Proxy) (*model.PushReques
                return req, res
        }
 
+       // CRITICAL: According to Istio proxyless gRPC behavior, when 
SubsetRule (DestinationRule) is created/updated
+       // with TLS configuration (ISTIO_MUTUAL), CDS must be pushed to update 
cluster TransportSocket.
+       // Even if req.Full is false, we need to check if SubsetRule was 
updated, as it affects cluster TLS config.
+       if req != nil && req.ConfigsUpdated != nil {
+               // Check if SubsetRule was updated - this requires CDS push to 
update cluster TransportSocket
+               if model.HasConfigsOfKind(req.ConfigsUpdated, kind.SubsetRule) {
+                       log.Debugf("cdsNeedsPush: SubsetRule updated, CDS push 
required to update cluster TLS config")
+                       return req, true
+               }
+       }
+
        if !req.Full {
                return req, false
        }
diff --git a/manifests/charts/base/files/crd-all.yaml 
b/manifests/charts/base/files/crd-all.yaml
index 9f82a97e..96333392 100644
--- a/manifests/charts/base/files/crd-all.yaml
+++ b/manifests/charts/base/files/crd-all.yaml
@@ -90,12 +90,12 @@ spec:
                                 description: |-
                                   Indicates whether connections to this port 
should be secured using TLS.
                                   
-                                  Valid Options: DISABLE, SIMPLE, MUTUAL, 
DUBBO_MUTUAL
+                                  Valid Options: DISABLE, SIMPLE, MUTUAL, 
ISTIO_MUTUAL
                                 enum:
                                   - DISABLE
                                   - SIMPLE
                                   - MUTUAL
-                                  - DUBBO_MUTUAL
+                                  - ISTIO_MUTUAL
                                 type: string
                             type: object
                         type: object
@@ -114,12 +114,12 @@ spec:
                           description: |-
                             Indicates whether connections to this port should 
be secured using TLS.
                             
-                            Valid Options: DISABLE, SIMPLE, MUTUAL, 
DUBBO_MUTUAL
+                            Valid Options: DISABLE, SIMPLE, MUTUAL, 
ISTIO_MUTUAL
                           enum:
                             - DISABLE
                             - SIMPLE
                             - MUTUAL
-                            - DUBBO_MUTUAL
+                            - ISTIO_MUTUAL
                           type: string
                       type: object
                   type: object
diff --git 
a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml 
b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
index bc0264d7..b8e63967 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
@@ -17,6 +17,11 @@ metadata:
   annotations:
     dubbo.apache.org/rev: default
 spec:
+  # NOTE:
+  #   Application containers that rely on this template must mount the same 
"dubbo-data"
+  #   volume at /var/lib/dubbo/data and set 
GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true
+  #   in order for proxyless gRPC security (mTLS) to pick up the certificates 
generated
+  #   by the dubbo-proxy sidecar.
   containers:
     - name: dubbo-proxy
       image: "{{ $.ProxyImage }}"
@@ -57,6 +62,10 @@ spec:
           value: dubbod
         - name: DUBBO_META_CLUSTER_ID
           value: Kubernetes
+        - name: DUBBO_META_NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
         - name: CA_ADDR
           value: dubbod.dubbo-system.svc:15012
         - name: PROXY_CONFIG
diff --git a/pkg/dubbo-agent/agent.go b/pkg/dubbo-agent/agent.go
index 8e560d84..1fb56210 100644
--- a/pkg/dubbo-agent/agent.go
+++ b/pkg/dubbo-agent/agent.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/model"
 
        "github.com/apache/dubbo-kubernetes/dubbod/security/pkg/nodeagent/cache"
+       "github.com/apache/dubbo-kubernetes/pkg/backoff"
        "github.com/apache/dubbo-kubernetes/pkg/bootstrap"
        "github.com/apache/dubbo-kubernetes/pkg/config/constants"
        "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
@@ -142,7 +143,7 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
        }
 
        log.Info("Starting default Dubbo SDS Server")
-       err = a.initSdsServer()
+       err = a.initSdsServer(ctx)
        if err != nil {
                return nil, fmt.Errorf("failed to start default Dubbo SDS 
server: %v", err)
        }
@@ -219,19 +220,6 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
                }
        }()
 
-       // Wait for certificate generation to complete before establishing 
preemptive connection.
-       // This ensures certificate logs appear after "Opening status port" but 
before "Establishing preemptive connection".
-       // The SDS service generates certificates asynchronously in 
newSDSService.
-       // We wait a short time for the SDS service's async generation to 
complete and log.
-       // This avoids calling GenerateSecret ourselves, which would cause 
duplicate generation and logs.
-       if a.secretCache != nil && !a.secOpts.FileMountedCerts && 
!a.secOpts.ServeOnlyFiles {
-               // Give SDS service's async certificate generation goroutine a 
chance to complete.
-               // The SDS service starts generating certificates immediately 
after initSdsServer,
-               // so a short wait is usually sufficient for the certificates 
to be generated and logged.
-               // This avoids the need to call GenerateSecret ourselves, which 
would cause duplicate logs.
-               time.Sleep(300 * time.Millisecond)
-       }
-
        // Now set bootstrap node to trigger preemptive connection.
        // This ensures preemptive connection logs appear after certificate 
logs.
        if bootstrapNode != nil && a.xdsProxy != nil {
@@ -383,7 +371,7 @@ func (a *Agent) startFileWatcher(ctx context.Context, 
filePath string, handler f
        }
 }
 
-func (a *Agent) initSdsServer() error {
+func (a *Agent) initSdsServer(ctx context.Context) error {
        var err error
        if 
security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath, 
security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
                log.Info("workload certificate files detected, creating secret 
manager without caClient")
@@ -401,9 +389,7 @@ func (a *Agent) initSdsServer() error {
 
        pkpConf := a.proxyConfig.GetPrivateKeyProvider()
        a.sdsServer = a.cfg.SDSFactory(a.secOpts, a.secretCache, pkpConf)
-       a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
-
-       return nil
+       return a.registerSecretHandler(ctx)
 }
 
 func (a *Agent) rebuildSDSWithNewCAClient() {
@@ -426,8 +412,55 @@ func (a *Agent) rebuildSDSWithNewCAClient() {
        a.secretCache = sc
        pkpConf := a.proxyConfig.GetPrivateKeyProvider()
        a.sdsServer = a.cfg.SDSFactory(a.secOpts, a.secretCache, pkpConf)
-       a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
-       log.Info("SDS server and CA client rebuilt successfully")
+       if err := a.registerSecretHandler(context.Background()); err != nil {
+               log.Errorf("failed to refresh workload certificates after CA 
rebuild: %v", err)
+       } else {
+               log.Info("SDS server and CA client rebuilt successfully")
+       }
+}
+
+func (a *Agent) registerSecretHandler(ctx context.Context) error {
+       if a.secretCache == nil {
+               return nil
+       }
+       handler := func(resourceName string) {
+               if a.sdsServer != nil {
+                       a.sdsServer.OnSecretUpdate(resourceName)
+               }
+               if resourceName == security.WorkloadKeyCertResourceName || 
resourceName == security.RootCertReqResourceName {
+                       go func() {
+                               if err := 
a.ensureWorkloadCertificates(context.Background()); err != nil {
+                                       log.Warnf("failed to refresh workload 
certificates after %s update: %v", resourceName, err)
+                               }
+                       }()
+               }
+       }
+       a.secretCache.RegisterSecretHandler(handler)
+       return a.ensureWorkloadCertificates(ctx)
+}
+
+func (a *Agent) ensureWorkloadCertificates(ctx context.Context) error {
+       if a.secretCache == nil || a.secOpts == nil || 
a.secOpts.OutputKeyCertToDir == "" {
+               // Nothing to write
+               return nil
+       }
+       generate := func(resource string) error {
+               b := backoff.NewExponentialBackOff(backoff.DefaultOption())
+               return b.RetryWithContext(ctx, func() error {
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       }
+                       _, err := a.secretCache.GenerateSecret(resource)
+                       if err != nil {
+                               log.Warnf("failed to generate %s: %v", 
resource, err)
+                       }
+                       return err
+               })
+       }
+       if err := generate(security.WorkloadKeyCertResourceName); err != nil {
+               return err
+       }
+       return generate(security.RootCertReqResourceName)
 }
 
 func (a *Agent) generateGRPCBootstrapWithNode() (*model.Node, error) {
diff --git a/samples/grpc-app/README.md b/samples/grpc-app/README.md
index d28d0b0c..15629269 100644
--- a/samples/grpc-app/README.md
+++ b/samples/grpc-app/README.md
@@ -5,8 +5,8 @@ This example demonstrates how to deploy gRPC applications with 
proxyless service
 ## Overview
 
 This sample includes:
-- **Consumer**: A gRPC server that receives requests (port 17070)
-- **Producer**: A gRPC client that sends requests to the consumer service
+- **Producer**: A gRPC server that receives requests (port 17070) and is 
deployed with multiple versions (v1/v2) to showcase gray release scenarios.
+- **Consumer**: A gRPC client that sends requests to the producer service and 
exposes a test server (port 17171) for driving traffic via `grpcurl`.
 
 Both services use native gRPC xDS clients to connect to the Dubbo control 
plane through the `dubbo-proxy` sidecar, enabling service discovery, load 
balancing, and traffic management without requiring Envoy proxy for application 
traffic.
 
@@ -49,22 +49,35 @@ kubectl get svc -n grpc-app
 - `inject.dubbo.apache.org/templates: grpc-agent` - Uses the grpc-agent 
template
 - `proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": true}'` 
- Ensures proxy starts before application
 
-### Environment Variables
+### Security requirements
 
-The `dubbo-proxy` sidecar automatically:
-- Creates `/etc/dubbo/proxy/grpc-bootstrap.json` bootstrap file
-- Exposes xDS proxy via Unix Domain Socket
-- Sets `GRPC_XDS_BOOTSTRAP` environment variable for application containers
+When mTLS is enabled (`SubsetRule` with `ISTIO_MUTUAL` or `PeerAuthentication 
STRICT`), **both the producer and consumer application containers must**:
+
+1. Mount the certificate output directory that `dubbo-proxy` writes to:
+   ```yaml
+   volumeMounts:
+     - name: dubbo-data
+       mountPath: /var/lib/dubbo/data
+   ```
+   The `grpc-agent` template already provides the `dubbo-data` volume; 
mounting it from the app container makes the generated `cert-chain.pem`, 
`key.pem`, and `root-cert.pem` visible to gRPC.
+
+2. Set `GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true` so the gRPC runtime 
actually consumes the xDS security config instead of falling back to plaintext. 
The sample manifest in `grpc-app.yaml` shows the required environment variable.
+
+3. When testing with `grpcurl`, export the same variable before issuing TLS 
requests:
+   ```bash
+   export GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true
+   grpcurl -d 
'{"url":"xds:///producer.grpc-app.svc.cluster.local:7070","count":5}' 
localhost:17171 echo.EchoTestService/ForwardEcho
+   ```
 
 ## Testing
 
 ### Test with grpcurl
 
-1. Port forward to producer service:
+1. Port forward to the consumer test server:
 
 ```bash
 kubectl port-forward -n grpc-app \
-  $(kubectl get pod -l app=producer -n grpc-app -o 
jsonpath='{.items[0].metadata.name}') \
+  $(kubectl get pod -l app=consumer -n grpc-app -o 
jsonpath='{.items[0].metadata.name}') \
   17171:17171 &
 ```
 
@@ -72,7 +85,7 @@ kubectl port-forward -n grpc-app \
 
 ```bash
 grpcurl -plaintext -d '{
-  "url": "xds:///consumer.grpc-app.svc.cluster.local:7070",
+  "url": "xds:///producer.grpc-app.svc.cluster.local:7070",
   "count": 5
 }' localhost:17171 echo.EchoTestService/ForwardEcho
 ```
@@ -81,11 +94,11 @@ Expected output:
 ```json
 {
   "output": [
-    "[0 body] Hostname=consumer-xxx",
-    "[1 body] Hostname=consumer-yyy",
-    "[2 body] Hostname=consumer-xxx",
-    "[3 body] Hostname=consumer-yyy",
-    "[4 body] Hostname=consumer-xxx"
+    "[0 body] Hostname=producer-xxx",
+    "[1 body] Hostname=producer-yyy",
+    "[2 body] Hostname=producer-xxx",
+    "[3 body] Hostname=producer-yyy",
+    "[4 body] Hostname=producer-xxx"
   ]
 }
 ```
@@ -93,14 +106,14 @@ Expected output:
 ### Check Logs
 
 ```bash
-# Consumer logs
-kubectl logs -f -l app=consumer -n grpc-app -c app
-
 # Producer logs
 kubectl logs -f -l app=producer -n grpc-app -c app
 
+# Consumer logs
+kubectl logs -f -l app=consumer -n grpc-app -c app
+
 # Proxy sidecar logs
-kubectl logs -f -l app=consumer -n grpc-app -c dubbo-proxy
+kubectl logs -f -l app=producer -n grpc-app -c dubbo-proxy
 ```
 
 ## Troubleshooting
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index cb3bef55..6877235d 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -17,12 +17,12 @@ apiVersion: v1
 kind: Service
 metadata:
   labels:
-    app: consumer
-  name: consumer
+    app: producer
+  name: producer
   namespace: grpc-app
 spec:
   selector:
-    app: consumer
+    app: producer
   type: ClusterIP
   ports:
     - name: grpc
@@ -32,13 +32,13 @@ spec:
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: consumer-v1
+  name: producer-v1
   namespace: grpc-app
 spec:
-  replicas: 2
+  replicas: 1
   selector:
     matchLabels:
-      app: consumer
+      app: producer
       version: v1
   template:
     metadata:
@@ -47,18 +47,20 @@ spec:
         inject.dubbo.apache.org/templates: grpc-agent
         proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": 
true}'
       labels:
-        app: consumer
+        app: producer
         version: v1
     spec:
       containers:
         - name: app
-          image: mfordjody/grpc-consumer:dev-debug
+          image: mfordjody/grpc-producer:dev-debug
           imagePullPolicy: Always
           ports:
             - containerPort: 17070
               protocol: TCP
               name: grpc
           env:
+            - name: GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT
+              value: "true"
             - name: INSTANCE_IP
               valueFrom:
                 fieldRef:
@@ -74,6 +76,9 @@ spec:
                   fieldPath: metadata.namespace
             - name: SERVICE_PORT
               value: "17070"
+          volumeMounts:
+            - mountPath: /var/lib/dubbo/data
+              name: dubbo-data
           readinessProbe:
             tcpSocket:
               port: 17070
@@ -90,17 +95,20 @@ spec:
             timeoutSeconds: 2
             successThreshold: 1
             failureThreshold: 3
+      volumes:
+        - name: dubbo-data
+          emptyDir: {}
 ---
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: consumer-v2
+  name: producer-v2
   namespace: grpc-app
 spec:
-  replicas: 2
+  replicas: 1
   selector:
     matchLabels:
-      app: consumer
+      app: producer
       version: v2
   template:
     metadata:
@@ -109,18 +117,20 @@ spec:
         inject.dubbo.apache.org/templates: grpc-agent
         proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": 
true}'
       labels:
-        app: consumer
+        app: producer
         version: v2
     spec:
       containers:
         - name: app
-          image: mfordjody/grpc-consumer:dev-debug
+          image: mfordjody/grpc-producer:dev-debug
           imagePullPolicy: Always
           ports:
             - containerPort: 17070
               protocol: TCP
               name: grpc
           env:
+            - name: GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT
+              value: "true"
             - name: INSTANCE_IP
               valueFrom:
                 fieldRef:
@@ -136,6 +146,9 @@ spec:
                   fieldPath: metadata.namespace
             - name: SERVICE_PORT
               value: "17070"
+          volumeMounts:
+            - mountPath: /var/lib/dubbo/data
+              name: dubbo-data
           readinessProbe:
             tcpSocket:
               port: 17070
@@ -152,17 +165,20 @@ spec:
             timeoutSeconds: 2
             successThreshold: 1
             failureThreshold: 3
+      volumes:
+        - name: dubbo-data
+          emptyDir: {}
 ---
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: producer
+  name: consumer
   namespace: grpc-app
 spec:
   replicas: 1
   selector:
     matchLabels:
-      app: producer
+      app: consumer
   template:
     metadata:
       annotations:
@@ -170,23 +186,28 @@ spec:
         inject.dubbo.apache.org/templates: grpc-agent
         proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": 
true}'
       labels:
-        app: producer
+        app: consumer
     spec:
       containers:
         - name: app
-          # Replace with your own image containing the producer binary
-          image: mfordjody/grpc-producer:dev-debug
+          # Replace with your own image containing the consumer binary
+          image: mfordjody/grpc-consumer:dev-debug
           imagePullPolicy: Always
-          # Optional: uncomment to test direct connection
-          # - --target=xds:///consumer.grpc-app.svc.cluster.local:7070
-          # - --count=10
           ports:
             - containerPort: 17171
               protocol: TCP
               name: grpc-test
           env:
+            - name: GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT
+              value: "true"
             - name: INSTANCE_IP
               valueFrom:
                 fieldRef:
                   apiVersion: v1
-                  fieldPath: status.podIP
\ No newline at end of file
+                  fieldPath: status.podIP
+          volumeMounts:
+            - mountPath: /var/lib/dubbo/data
+              name: dubbo-data
+      volumes:
+        - name: dubbo-data
+          emptyDir: {}
\ No newline at end of file
diff --git a/tests/grpc-app/README.md b/tests/grpc-app/README.md
index adcff343..f8f9098f 100644
--- a/tests/grpc-app/README.md
+++ b/tests/grpc-app/README.md
@@ -4,8 +4,8 @@ This is a test example for gRPC proxyless service mesh based on 
[Istio's blog po
 
 ## Architecture
 
-- **Consumer**: gRPC server with xDS support (port 17070)
-- **Producer**: gRPC client with xDS support + test server (port 17171)
+- **Producer**: gRPC server with xDS support (port 17070). This service is 
deployed with multiple versions (v1/v2) to demonstrate gray 
release/traffic-splitting scenarios and exposes gRPC reflection so `grpcurl` 
can query it directly.
+- **Consumer**: gRPC client with xDS support + test server (port 17171). This 
component drives load toward the producer service for automated tests.
 
 Both services use `dubbo-proxy` sidecar as an xDS proxy to connect to the 
control plane. The sidecar runs an xDS proxy server that listens on a Unix 
Domain Socket (UDS) at `/etc/dubbo/proxy/XDS`. The gRPC applications connect to 
this xDS proxy via the UDS socket using the `GRPC_XDS_BOOTSTRAP` environment 
variable.
 
diff --git a/tests/grpc-app/consumer/main.go b/tests/grpc-app/consumer/main.go
index 89da67e9..a9916f3a 100644
--- a/tests/grpc-app/consumer/main.go
+++ b/tests/grpc-app/consumer/main.go
@@ -19,6 +19,7 @@ package main
 
 import (
        "context"
+       "encoding/json"
        "flag"
        "fmt"
        "log"
@@ -26,105 +27,28 @@ import (
        "os"
        "os/signal"
        "regexp"
-       "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 
        "google.golang.org/grpc"
+       "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/credentials/insecure"
        xdscreds "google.golang.org/grpc/credentials/xds"
        "google.golang.org/grpc/grpclog"
        "google.golang.org/grpc/reflection"
-       "google.golang.org/grpc/xds"
+       "google.golang.org/grpc/status"
+       _ "google.golang.org/grpc/xds"
 
        pb "github.com/apache/dubbo-kubernetes/tests/grpc-app/proto"
 )
 
 var (
-       port = flag.Int("port", 17070, "gRPC server port")
+       port       = flag.Int("port", 17171, "gRPC server port for ForwardEcho 
testing")
+       testServer *grpc.Server
 )
 
-type echoServer struct {
-       pb.UnimplementedEchoServiceServer
-       pb.UnimplementedEchoTestServiceServer
-       hostname       string
-       serviceVersion string
-       namespace      string
-       instanceIP     string
-       cluster        string
-       servicePort    int
-}
-
-func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest) 
(*pb.EchoResponse, error) {
-       if req == nil {
-               return nil, fmt.Errorf("request is nil")
-       }
-       log.Printf("Received: %v", req.Message)
-       return &pb.EchoResponse{
-               Message:        req.Message,
-               Hostname:       s.hostname,
-               ServiceVersion: s.serviceVersion,
-               Namespace:      s.namespace,
-               Ip:             s.instanceIP,
-               Cluster:        s.cluster,
-               ServicePort:    int32(s.servicePort),
-       }, nil
-}
-
-func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream 
pb.EchoService_StreamEchoServer) error {
-       if req == nil {
-               return fmt.Errorf("request is nil")
-       }
-       if stream == nil {
-               return fmt.Errorf("stream is nil")
-       }
-       log.Printf("StreamEcho received: %v", req.Message)
-       for i := 0; i < 3; i++ {
-               if err := stream.Send(&pb.EchoResponse{
-                       Message:  fmt.Sprintf("%s [%d]", req.Message, i),
-                       Hostname: s.hostname,
-               }); err != nil {
-                       log.Printf("StreamEcho send error: %v", err)
-                       return err
-               }
-       }
-       return nil
-}
-
-func (s *echoServer) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
-       if req == nil {
-               return nil, fmt.Errorf("request is nil")
-       }
-
-       count := req.Count
-       if count < 0 {
-               count = 0
-       }
-       if count > 100 {
-               count = 100
-       }
-
-       log.Printf("ForwardEcho called: url=%s, count=%d", req.Url, count)
-
-       output := make([]string, 0, count)
-       for i := int32(0); i < count; i++ {
-               line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s 
ServicePort=%d Namespace=%s",
-                       i, s.hostname, s.serviceVersion, s.servicePort, 
s.namespace)
-               if s.instanceIP != "" {
-                       line += fmt.Sprintf(" IP=%s", s.instanceIP)
-               }
-               if s.cluster != "" {
-                       line += fmt.Sprintf(" Cluster=%s", s.cluster)
-               }
-               output = append(output, line)
-       }
-
-       return &pb.ForwardEchoResponse{
-               Output: output,
-       }, nil
-}
-
 // grpcLogger filters out xDS informational logs that are incorrectly marked 
as ERROR
 type grpcLogger struct {
        logger *log.Logger
@@ -158,7 +82,6 @@ func cleanMessage(msg string) string {
 
 func (l *grpcLogger) Info(args ...interface{}) {
        msg := fmt.Sprint(args...)
-       // Filter out xDS "entering mode: SERVING" logs
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
@@ -201,18 +124,9 @@ func (l *grpcLogger) Warningf(format string, args 
...interface{}) {
 
 func (l *grpcLogger) Error(args ...interface{}) {
        msg := fmt.Sprint(args...)
-       // Filter out xDS "entering mode: SERVING" logs that are incorrectly 
marked as ERROR
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
-       // Filter out common connection reset errors - these are normal network 
behavior
-       // when clients disconnect before completing the HTTP/2 handshake
-       if strings.Contains(msg, "connection reset by peer") ||
-               strings.Contains(msg, "failed to receive the preface from 
client") ||
-               strings.Contains(msg, "connection error") {
-               // These are normal network events, log at DEBUG level instead 
of ERROR
-               return
-       }
        msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
@@ -222,12 +136,6 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
-       // Filter out common connection reset errors - these are normal network 
behavior
-       if strings.Contains(msg, "connection reset by peer") ||
-               strings.Contains(msg, "failed to receive the preface from 
client") ||
-               strings.Contains(msg, "connection error") {
-               return
-       }
        msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
@@ -237,12 +145,6 @@ func (l *grpcLogger) Errorf(format string, args 
...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
-       // Filter out common connection reset errors - these are normal network 
behavior
-       if strings.Contains(msg, "connection reset by peer") ||
-               strings.Contains(msg, "failed to receive the preface from 
client") ||
-               strings.Contains(msg, "connection error") {
-               return
-       }
        msg = cleanMessage(msg)
        l.logger.Printf("ERROR: %s", msg)
 }
@@ -263,139 +165,578 @@ func (l *grpcLogger) V(level int) bool {
        return level <= 0
 }
 
-// waitForBootstrapFile waits for the grpc-bootstrap.json file to exist
-// This is necessary because the dubbo-proxy sidecar needs time to generate 
the file
-func waitForBootstrapFile(bootstrapPath string, maxWait time.Duration) error {
-       log.Printf("Waiting for bootstrap file to exist: %s (max wait: %v)", 
bootstrapPath, maxWait)
+func main() {
+       flag.Parse()
 
-       ctx, cancel := context.WithTimeout(context.Background(), maxWait)
-       defer cancel()
+       // Set custom gRPC logger to filter out xDS informational logs
+       // The "ERROR: [xds] Listener entering mode: SERVING" is actually an 
informational log
+       grpclog.SetLoggerV2(&grpcLogger{
+               logger: log.New(os.Stderr, "", log.LstdFlags),
+       })
 
-       ticker := time.NewTicker(500 * time.Millisecond)
-       defer ticker.Stop()
+       go startTestServer(*port)
 
-       startTime := time.Now()
-       for {
-               // Check if file exists and is not empty
-               if info, err := os.Stat(bootstrapPath); err == nil && 
info.Size() > 0 {
-                       log.Printf("Bootstrap file found after %v: %s", 
time.Since(startTime), bootstrapPath)
-                       return nil
-               }
+       log.Printf("Consumer running. Test server listening on port %d for 
ForwardEcho", *port)
 
-               // Check for timeout
-               select {
-               case <-ctx.Done():
-                       return fmt.Errorf("timeout waiting for bootstrap file: 
%s (waited %v)", bootstrapPath, time.Since(startTime))
-               case <-ticker.C:
-                       // Continue waiting
-               }
-       }
-}
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+       <-sigChan
+       log.Println("Shutting down...")
 
-func firstNonEmpty(values ...string) string {
-       for _, v := range values {
-               if strings.TrimSpace(v) != "" {
-                       return v
-               }
+       if testServer != nil {
+               log.Println("Stopping test server...")
+               testServer.GracefulStop()
        }
-       return ""
 }
 
-func main() {
-       flag.Parse()
+func startTestServer(port int) {
+       lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
+       if err != nil {
+               log.Printf("Failed to listen on port %d: %v", port, err)
+               return
+       }
 
-       // Set custom gRPC logger to filter out xDS informational logs
-       // The "ERROR: [xds] Listener entering mode: SERVING" is actually an 
informational log
-       grpclog.SetLoggerV2(&grpcLogger{
-               logger: log.New(os.Stderr, "", log.LstdFlags),
+       testServer = grpc.NewServer()
+       pb.RegisterEchoTestServiceServer(testServer, &testServerImpl{
+               connCache: make(map[string]*cachedConnection),
        })
+       reflection.Register(testServer)
 
-       hostname, _ := os.Hostname()
-       if hostname == "" {
-               hostname = "unknown"
+       log.Printf("Test server listening on port %d for ForwardEcho 
(reflection enabled)", port)
+       if err := testServer.Serve(lis); err != nil {
+               log.Printf("Test server error: %v", err)
        }
+}
+
+type cachedConnection struct {
+       conn      *grpc.ClientConn
+       createdAt time.Time
+}
 
-       namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"), 
os.Getenv("POD_NAMESPACE"), "default")
-       serviceVersion := firstNonEmpty(
-               os.Getenv("SERVICE_VERSION"),
-               os.Getenv("POD_VERSION"),
-               os.Getenv("VERSION"),
-       )
-       if serviceVersion == "" {
-               serviceVersion = "unknown"
+type testServerImpl struct {
+       pb.UnimplementedEchoTestServiceServer
+       // Connection cache: map from URL to cached connection
+       connCache map[string]*cachedConnection
+       connMutex sync.RWMutex
+}
+
+// formatGRPCError formats gRPC errors similar to grpcurl output
+func formatGRPCError(err error, index int32, total int32) string {
+       if err == nil {
+               return ""
        }
-       cluster := os.Getenv("SERVICE_CLUSTER")
-       instanceIP := os.Getenv("INSTANCE_IP")
-       servicePort := *port
-       if sp := os.Getenv("SERVICE_PORT"); sp != "" {
-               if parsed, err := strconv.Atoi(sp); err == nil {
-                       servicePort = parsed
+
+       // Extract gRPC status code and message using status package
+       code := "Unknown"
+       message := err.Error()
+
+       // Try to extract gRPC status
+       if st, ok := status.FromError(err); ok {
+               code = st.Code().String()
+               message = st.Message()
+       } else {
+               // Fallback: try to extract from error string
+               if strings.Contains(message, "code = ") {
+                       // Extract code like "code = Unavailable"
+                       codeMatch := regexp.MustCompile(`code = 
(\w+)`).FindStringSubmatch(message)
+                       if len(codeMatch) > 1 {
+                               code = codeMatch[1]
+                       }
+                       // Extract message after "desc = "
+                       descMatch := regexp.MustCompile(`desc = 
"?([^"]+)"?`).FindStringSubmatch(message)
+                       if len(descMatch) > 1 {
+                               message = descMatch[1]
+                       } else {
+                               // If no desc, try to extract message after code
+                               parts := strings.SplitN(message, "desc = ", 2)
+                               if len(parts) > 1 {
+                                       message = strings.Trim(parts[1], `"`)
+                               }
+                       }
                }
        }
 
-       // Get bootstrap file path from environment variable or use default
+       // Format similar to grpcurl (single line format)
+       if total == 1 {
+               return fmt.Sprintf("ERROR:\nCode: %s\nMessage: %s", code, 
message)
+       }
+       return fmt.Sprintf("[%d] Error: rpc error: code = %s desc = %s", index, 
code, message)
+}
+
+func (s *testServerImpl) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+       if req == nil {
+               return nil, fmt.Errorf("request is nil")
+       }
+
+       if req.Url == "" {
+               return nil, fmt.Errorf("url is required")
+       }
+
+       count := req.Count
+       if count < 0 {
+               count = 0
+       }
+       if count > 100 {
+               count = 100
+       }
+
+       log.Printf("ForwardEcho: url=%s, count=%d", req.Url, count)
+
+       // Check bootstrap configuration
        bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
        if bootstrapPath == "" {
-               bootstrapPath = "/etc/dubbo/proxy/grpc-bootstrap.json"
-               log.Printf("GRPC_XDS_BOOTSTRAP not set, using default: %s", 
bootstrapPath)
+               return nil, fmt.Errorf("GRPC_XDS_BOOTSTRAP environment variable 
is not set")
        }
 
-       // Wait for bootstrap file to exist before creating xDS server
-       // The dubbo-proxy sidecar needs time to generate this file
-       if err := waitForBootstrapFile(bootstrapPath, 60*time.Second); err != 
nil {
-               log.Fatalf("Failed to wait for bootstrap file: %v", err)
+       // Verify bootstrap file exists
+       if _, err := os.Stat(bootstrapPath); os.IsNotExist(err) {
+               return nil, fmt.Errorf("bootstrap file does not exist: %s", 
bootstrapPath)
        }
 
-       // Create xDS-enabled gRPC server
-       // For proxyless gRPC, we use xds.NewGRPCServer() instead of 
grpc.NewServer()
-       creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
-               FallbackCreds: insecure.NewCredentials(),
-       })
+       // Read bootstrap file to verify UDS socket
+       bootstrapData, err := os.ReadFile(bootstrapPath)
        if err != nil {
-               log.Fatalf("Failed to create xDS server credentials: %v", err)
+               return nil, fmt.Errorf("failed to read bootstrap file: %v", err)
        }
 
-       server, err := xds.NewGRPCServer(grpc.Creds(creds))
-       if err != nil {
-               log.Fatalf("Failed to create xDS gRPC server: %v", err)
+       var bootstrapJSON map[string]interface{}
+       if err := json.Unmarshal(bootstrapData, &bootstrapJSON); err != nil {
+               return nil, fmt.Errorf("failed to parse bootstrap file: %v", 
err)
        }
 
-       es := &echoServer{
-               hostname:       hostname,
-               serviceVersion: serviceVersion,
-               namespace:      namespace,
-               instanceIP:     instanceIP,
-               cluster:        cluster,
-               servicePort:    servicePort,
+       // Extract UDS socket path
+       var udsPath string
+       if xdsServers, ok := bootstrapJSON["xds_servers"].([]interface{}); ok 
&& len(xdsServers) > 0 {
+               if server, ok := xdsServers[0].(map[string]interface{}); ok {
+                       if serverURI, ok := server["server_uri"].(string); ok {
+                               if strings.HasPrefix(serverURI, "unix://") {
+                                       udsPath = strings.TrimPrefix(serverURI, 
"unix://")
+                                       if _, err := os.Stat(udsPath); 
os.IsNotExist(err) {
+                                               return nil, fmt.Errorf("UDS 
socket does not exist: %s", udsPath)
+                                       }
+                               }
+                       }
+               }
        }
-       pb.RegisterEchoServiceServer(server, es)
-       pb.RegisterEchoTestServiceServer(server, es)
-       // Enable reflection API for grpcurl to discover services
-       reflection.Register(server)
 
-       lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
-       if err != nil {
-               log.Fatalf("Failed to listen: %v", err)
+       // CRITICAL: Reuse connections to avoid creating new xDS connections 
for each RPC call
+       // This prevents the RDS request loop issue and ensures stable 
connection state
+       s.connMutex.RLock()
+       cached, exists := s.connCache[req.Url]
+       var conn *grpc.ClientConn
+       if exists && cached != nil {
+               conn = cached.conn
+       }
+       s.connMutex.RUnlock()
+
+       // CRITICAL: Check if cached connection is still valid and not too old.
+       // When xDS config changes (e.g., TLS is added/removed), gRPC xDS 
client should update connections,
+       // but if the connection was established before xDS config was 
received, it may use old configuration.
+       // To ensure we use the latest xDS config, we clear connections older 
than 10 seconds.
+       // This ensures that when SubsetRule or PeerAuthentication is 
created/updated, connections are
+       // rebuilt quickly to use the new configuration.
+       const maxConnectionAge = 10 * time.Second
+       if exists && conn != nil {
+               state := conn.GetState()
+               if state == connectivity.Shutdown {
+                       // Connection is closed, remove from cache
+                       log.Printf("ForwardEcho: cached connection for %s is 
SHUTDOWN, removing from cache", req.Url)
+                       s.connMutex.Lock()
+                       delete(s.connCache, req.Url)
+                       conn = nil
+                       exists = false
+                       s.connMutex.Unlock()
+               } else if time.Since(cached.createdAt) > maxConnectionAge {
+                       // Connection is too old, may be using stale xDS config 
(e.g., plaintext when TLS is required)
+                       // Clear cache to force reconnection with latest xDS 
config
+                       log.Printf("ForwardEcho: cached connection for %s is 
too old (%v), clearing cache to use latest xDS config", req.Url, 
time.Since(cached.createdAt))
+                       s.connMutex.Lock()
+                       if cachedConn, stillExists := s.connCache[req.Url]; 
stillExists && cachedConn != nil && cachedConn.conn != nil {
+                               cachedConn.conn.Close()
+                       }
+                       delete(s.connCache, req.Url)
+                       conn = nil
+                       exists = false
+                       s.connMutex.Unlock()
+               }
+       }
+
+       if !exists || conn == nil {
+               // Create new connection
+               s.connMutex.Lock()
+               // Double-check after acquiring write lock
+               if cached, exists = s.connCache[req.Url]; !exists || cached == 
nil || cached.conn == nil {
+                       conn = nil
+                       // CRITICAL: When TLS is configured (SubsetRule 
ISTIO_MUTUAL), gRPC xDS client needs
+                       // to fetch certificates from CertificateProvider. The 
CertificateProvider uses file_watcher
+                       // to read certificate files. If the files are not 
ready or CertificateProvider is not
+                       // initialized, certificate fetching will timeout.
+                       // We wait a short time to ensure CertificateProvider 
is ready and certificate files are accessible.
+                       // This is especially important when SubsetRule is just 
created and TLS is enabled.
+                       // The CertificateProvider may need time to initialize, 
especially on first connection.
+                       // We wait 3 seconds to give CertificateProvider enough 
time to initialize (reduced from 5s for faster startup).
+                       log.Printf("ForwardEcho: waiting 3 seconds to ensure 
CertificateProvider is ready...")
+                       time.Sleep(3 * time.Second)
+
+                       // Create xDS client credentials
+                       // NOTE: FallbackCreds is REQUIRED by gRPC xDS library 
for initial connection
+                       // before xDS configuration is available. However, once 
xDS configures TLS,
+                       // the client will use TLS and will NOT fallback to 
plaintext if TLS fails.
+                       // FallbackCreds is only used when xDS has not yet 
provided TLS configuration.
+                       creds, err := 
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+                               FallbackCreds: insecure.NewCredentials(),
+                       })
+                       if err != nil {
+                               s.connMutex.Unlock()
+                               return nil, fmt.Errorf("failed to create xDS 
client credentials: %v", err)
+                       }
+
+                       // Dial with xDS URL - use background context, not the 
request context
+                       // The request context might timeout before xDS 
configuration is received
+                       // CRITICAL: When TLS is configured (SubsetRule 
ISTIO_MUTUAL), gRPC xDS client needs
+                       // to fetch certificates from CertificateProvider. This 
may take time, especially on
+                       // first connection. We use a longer timeout context to 
allow certificate fetching.
+                       log.Printf("ForwardEcho: creating new connection for 
%s...", req.Url)
+                       dialCtx, dialCancel := 
context.WithTimeout(context.Background(), 60*time.Second)
+                       conn, err = grpc.DialContext(dialCtx, req.Url, 
grpc.WithTransportCredentials(creds))
+                       dialCancel()
+                       if err != nil {
+                               s.connMutex.Unlock()
+                               return nil, fmt.Errorf("failed to dial %s: %v", 
req.Url, err)
+                       }
+                       s.connCache[req.Url] = &cachedConnection{
+                               conn:      conn,
+                               createdAt: time.Now(),
+                       }
+                       log.Printf("ForwardEcho: cached connection for %s", 
req.Url)
+               }
+               s.connMutex.Unlock()
+       } else {
+               log.Printf("ForwardEcho: reusing cached connection for %s 
(state: %v)", req.Url, conn.GetState())
+               // NOTE: We reuse the cached connection. If xDS config changes 
(e.g., TLS is added/removed),
+               // gRPC xDS client should automatically update the connection. 
However, if the connection
+               // was established before xDS config was received, it may still 
be using old configuration.
+               // We rely on the TLS mismatch detection logic in the RPC error 
handling to handle this case.
        }
 
-       log.Printf("Starting gRPC proxyless server on port %d (hostname: %s)", 
*port, hostname)
-
-       go func() {
-               sigChan := make(chan os.Signal, 1)
-               signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
-               <-sigChan
-               log.Println("Shutting down server...")
-               server.GracefulStop()
-       }()
-
-       // Serve the gRPC server
-       // Note: server.Serve returns when the listener is closed, which is 
normal during shutdown
-       // Connection reset errors are handled by the gRPC library and logged 
separately
-       if err := server.Serve(lis); err != nil {
-               // Only log as fatal if it's not a normal shutdown (listener 
closed)
-               if !strings.Contains(err.Error(), "use of closed network 
connection") {
-                       log.Fatalf("Failed to serve: %v", err)
+       initialState := conn.GetState()
+       log.Printf("ForwardEcho: initial connection state: %v", initialState)
+
+       // CRITICAL: Even if connection is READY, we need to verify it's still 
valid
+       // because xDS configuration may have changed (e.g., from plaintext to 
TLS)
+       // and the cached connection might be using old configuration.
+       // gRPC xDS client should automatically update connections, but if the 
connection
+       // was established before xDS config was received, it might be using 
FallbackCreds (plaintext).
+       // We'll proceed with RPC calls, but if they fail with TLS/plaintext 
mismatch errors,
+       // we'll clear the cache and retry.
+       // CRITICAL: When TLS is configured (SubsetRule ISTIO_MUTUAL), gRPC xDS 
client needs
+       // to fetch certificates from CertificateProvider during TLS handshake. 
The TLS handshake
+       // happens when the connection state transitions to READY. If 
CertificateProvider is not ready,
+       // the TLS handshake will timeout. We need to wait for the connection 
to be READY, which
+       // indicates that TLS handshake has completed successfully.
+       if initialState == connectivity.Ready {
+               log.Printf("ForwardEcho: connection is already READY, 
proceeding with RPC calls (will retry with new connection if TLS/plaintext 
mismatch detected)")
+       } else {
+               // Only wait for new connections or connections that are not 
READY
+               // For gRPC xDS proxyless, we need to wait for the client to 
receive and process LDS/CDS/EDS
+               // The connection state may transition: IDLE -> CONNECTING -> 
READY (or TRANSIENT_FAILURE -> CONNECTING -> READY)
+               // CRITICAL: When TLS is configured, the TLS handshake happens 
during this state transition.
+               // If CertificateProvider is not ready, the TLS handshake will 
timeout and connection will fail.
+               // We use a longer timeout (60 seconds) to allow 
CertificateProvider to fetch certificates.
+               log.Printf("ForwardEcho: waiting for xDS configuration to be 
processed and connection to be ready (60 seconds)...")
+
+               // Wait for state changes with multiple attempts
+               maxWait := 60 * time.Second
+
+               // Wait for state changes, allowing multiple state transitions
+               // CRITICAL: Don't exit on TRANSIENT_FAILURE - it may recover 
to READY
+               stateChanged := false
+               currentState := initialState
+               startTime := time.Now()
+               lastStateChangeTime := startTime
+
+               for time.Since(startTime) < maxWait {
+                       if currentState == connectivity.Ready {
+                               log.Printf("ForwardEcho: connection is READY 
after %v", time.Since(startTime))
+                               stateChanged = true
+                               break
+                       }
+
+                       // Only exit on Shutdown, not on TransientFailure (it 
may recover)
+                       if currentState == connectivity.Shutdown {
+                               log.Printf("ForwardEcho: connection in %v state 
after %v, cannot recover", currentState, time.Since(startTime))
+                               break
+                       }
+
+                       // Wait for state change with remaining timeout
+                       remaining := maxWait - time.Since(startTime)
+                       if remaining <= 0 {
+                               break
+                       }
+
+                       // Use shorter timeout for each WaitForStateChange call 
to allow periodic checks
+                       waitTimeout := remaining
+                       if waitTimeout > 5*time.Second {
+                               waitTimeout = 5 * time.Second
+                       }
+
+                       stateCtx, stateCancel := 
context.WithTimeout(context.Background(), waitTimeout)
+                       if conn.WaitForStateChange(stateCtx, currentState) {
+                               newState := conn.GetState()
+                               elapsed := time.Since(startTime)
+                               log.Printf("ForwardEcho: connection state 
changed from %v to %v after %v", currentState, newState, elapsed)
+                               stateChanged = true
+                               currentState = newState
+                               lastStateChangeTime = time.Now()
+
+                               // If READY, we're done
+                               if newState == connectivity.Ready {
+                                       stateCancel()
+                                       break
+                               }
+
+                               // If we're in TRANSIENT_FAILURE, continue 
waiting - it may recover
+                               // gRPC xDS client will retry connection when 
endpoints become available
+                               if newState == connectivity.TransientFailure {
+                                       log.Printf("ForwardEcho: connection in 
TRANSIENT_FAILURE, continuing to wait for recovery (remaining: %v)", 
maxWait-elapsed)
+                               }
+                       } else {
+                               // Timeout waiting for state change - check if 
we should continue
+                               elapsed := time.Since(startTime)
+                               if currentState == 
connectivity.TransientFailure {
+                                       // If we've been in TRANSIENT_FAILURE 
for a while, continue waiting
+                                       // The connection may recover when 
endpoints become available
+                                       if time.Since(lastStateChangeTime) < 
10*time.Second {
+                                               log.Printf("ForwardEcho: still 
in TRANSIENT_FAILURE after %v, continuing to wait (remaining: %v)", elapsed, 
maxWait-elapsed)
+                                       } else {
+                                               log.Printf("ForwardEcho: no 
state change after %v, current state: %v (remaining: %v)", elapsed, 
currentState, maxWait-elapsed)
+                                       }
+                               } else {
+                                       log.Printf("ForwardEcho: no state 
change after %v, current state: %v (remaining: %v)", elapsed, currentState, 
maxWait-elapsed)
+                               }
+                       }
+                       stateCancel()
+               }
+
+               finalState := conn.GetState()
+               log.Printf("ForwardEcho: final connection state: %v 
(stateChanged=%v, waited=%v)", finalState, stateChanged, time.Since(startTime))
+
+               // If connection is not READY, log a warning but proceed anyway
+               // The first RPC call may trigger connection establishment
+               if finalState != connectivity.Ready {
+                       log.Printf("ForwardEcho: WARNING - connection is not 
READY (state=%v), but proceeding with RPC calls", finalState)
                }
-               log.Printf("Server stopped: %v", err)
        }
+
+       // Create client and make RPC calls
+       client := pb.NewEchoServiceClient(conn)
+       output := make([]string, 0, count)
+
+       log.Printf("ForwardEcho: sending %d requests...", count)
+       errorCount := 0
+       firstError := ""
+       for i := int32(0); i < count; i++ {
+               echoReq := &pb.EchoRequest{
+                       Message: fmt.Sprintf("Request %d", i+1),
+               }
+
+               currentState := conn.GetState()
+               log.Printf("ForwardEcho: sending request %d (connection state: 
%v)...", i+1, currentState)
+
+               // Use longer timeout for requests to allow TLS handshake 
completion
+               // When mTLS is configured, certificate fetching and TLS 
handshake may take time
+               // Use 30 seconds for all requests to ensure TLS handshake has 
enough time
+               timeout := 30 * time.Second
+
+               reqCtx, reqCancel := context.WithTimeout(context.Background(), 
timeout)
+               reqStartTime := time.Now()
+               resp, err := client.Echo(reqCtx, echoReq)
+               duration := time.Since(reqStartTime)
+               reqCancel()
+
+               // Check connection state after RPC call
+               stateAfterRPC := conn.GetState()
+               log.Printf("ForwardEcho: request %d completed in %v, connection 
state: %v (was %v)", i+1, duration, stateAfterRPC, currentState)
+
+               if err != nil {
+                       log.Printf("ForwardEcho: request %d failed: %v", i+1, 
err)
+                       errorCount++
+                       if firstError == "" {
+                               firstError = err.Error()
+                       }
+
+                       // Format error similar to grpcurl output
+                       errMsg := formatGRPCError(err, i, count)
+                       output = append(output, errMsg)
+
+                       // CRITICAL: Only clear cache if we detect specific 
TLS/plaintext mismatch errors.
+                       // TRANSIENT_FAILURE can occur for many reasons (e.g., 
xDS config updates, endpoint changes),
+                       // so we should NOT clear cache on every 
TRANSIENT_FAILURE.
+                       // Only clear cache when we detect explicit TLS-related 
errors that indicate a mismatch.
+                       errStr := err.Error()
+                       isTLSMismatch := false
+
+                       // Check for specific TLS/plaintext mismatch indicators:
+                       // - "tls: first record does not look like a TLS 
handshake" - client uses TLS but server is plaintext
+                       // - "authentication handshake failed" with TLS context 
- TLS handshake failed
+                       // - "context deadline exceeded" during authentication 
handshake - may indicate TLS handshake timeout
+                       //   (e.g., client using plaintext but server requiring 
TLS, or vice versa)
+                       // - "fetching trusted roots from CertificateProvider 
failed" - CertificateProvider not ready yet
+                       //   This is a temporary error that should be retried 
after waiting for CertificateProvider to be ready
+                       // These errors indicate that client and server TLS 
configuration are mismatched, or CertificateProvider is not ready
+                       if strings.Contains(errStr, "tls: first record does not 
look like a TLS handshake") ||
+                               (strings.Contains(errStr, "authentication 
handshake failed") && strings.Contains(errStr, "tls:")) ||
+                               (strings.Contains(errStr, "authentication 
handshake failed") && strings.Contains(errStr, "context deadline exceeded")) ||
+                               strings.Contains(errStr, "fetching trusted 
roots from CertificateProvider failed") {
+                               isTLSMismatch = true
+                               log.Printf("ForwardEcho: detected TLS/plaintext 
mismatch or CertificateProvider not ready error: %v", err)
+                       }
+
+                       // CRITICAL: When TLS mismatch is detected, immediately 
clear cache and force reconnection.
+                       // This ensures that:
+                       // 1. If client config changed (plaintext -> TLS), new 
connection uses TLS
+                       // 2. If server config changed (TLS -> plaintext), new 
connection uses plaintext
+                       // 3. Connection behavior is consistent with current 
xDS configuration
+                       // According to Istio proxyless gRPC behavior:
+                       // - When only client TLS (SubsetRule ISTIO_MUTUAL) but 
server plaintext: connection SHOULD FAIL
+                       // - When client TLS + server mTLS (PeerAuthentication 
STRICT): connection SHOULD SUCCEED
+                       // - When both plaintext: connection SHOULD SUCCEED
+                       // By clearing cache and reconnecting, we ensure 
connection uses current xDS config.
+                       if isTLSMismatch {
+                               // Check if this is a CertificateProvider not 
ready error (temporary) vs configuration mismatch (persistent)
+                               isCertProviderNotReady := 
strings.Contains(errStr, "fetching trusted roots from CertificateProvider 
failed")
+
+                               // Clear cache and force reconnection on every 
TLS mismatch detection
+                               // This ensures we always try to use the latest 
xDS configuration
+                               if isCertProviderNotReady {
+                                       log.Printf("ForwardEcho: WARNING - 
CertificateProvider not ready yet: %v", err)
+                                       log.Printf("ForwardEcho: NOTE - This is 
a temporary error. CertificateProvider needs time to initialize.")
+                                       log.Printf("ForwardEcho: Clearing 
connection cache and waiting for CertificateProvider to be ready...")
+                               } else {
+                                       log.Printf("ForwardEcho: WARNING - 
detected TLS/plaintext mismatch error: %v", err)
+                                       log.Printf("ForwardEcho: NOTE - This 
error indicates that client and server TLS configuration are mismatched")
+                                       log.Printf("ForwardEcho: This usually 
happens when:")
+                                       log.Printf("ForwardEcho:   1. 
SubsetRule with ISTIO_MUTUAL exists but PeerAuthentication with STRICT does not 
(client TLS, server plaintext)")
+                                       log.Printf("ForwardEcho:   2. 
SubsetRule was deleted but cached connection still uses TLS")
+                                       log.Printf("ForwardEcho: Clearing 
connection cache to force reconnection with updated xDS config...")
+                               }
+
+                               s.connMutex.Lock()
+                               if cachedConn, stillExists := 
s.connCache[req.Url]; stillExists && cachedConn != nil && cachedConn.conn != 
nil {
+                                       cachedConn.conn.Close()
+                               }
+                               delete(s.connCache, req.Url)
+                               conn = nil
+                               s.connMutex.Unlock()
+
+                               // CRITICAL: Wait for xDS config to propagate 
and be processed by gRPC xDS client.
+                               // When CDS/LDS config changes, it takes time 
for:
+                               // 1. Control plane to push new config to gRPC 
xDS client
+                               // 2. gRPC xDS client to process and apply new 
config
+                               // 3. CertificateProvider to be ready and 
certificate files to be accessible
+                               // 4. New connections to use updated config
+                               // For CertificateProvider not ready errors, we 
wait shorter time (3 seconds) as it's usually faster.
+                               // For configuration mismatch, we wait longer 
(10 seconds) to ensure config has propagated.
+                               // Reduced wait times for faster recovery while 
still ensuring reliability.
+                               if isCertProviderNotReady {
+                                       log.Printf("ForwardEcho: waiting 3 
seconds for CertificateProvider to be ready...")
+                                       time.Sleep(3 * time.Second)
+                               } else {
+                                       log.Printf("ForwardEcho: waiting 10 
seconds for xDS config to propagate and CertificateProvider to be ready...")
+                                       time.Sleep(10 * time.Second)
+                               }
+
+                               // Recreate connection - this will use current 
xDS config
+                               // CRITICAL: When TLS is configured, gRPC xDS 
client needs to fetch certificates
+                               // from CertificateProvider. Use a longer 
timeout to allow certificate fetching.
+                               log.Printf("ForwardEcho: recreating connection 
with current xDS config...")
+                               creds, credErr := 
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+                                       FallbackCreds: 
insecure.NewCredentials(),
+                               })
+                               if credErr != nil {
+                                       log.Printf("ForwardEcho: failed to 
create xDS client credentials: %v", credErr)
+                                       // Continue to record error
+                               } else {
+                                       // Wait additional time before dialing 
to ensure CertificateProvider is ready
+                                       // Reduced from 3s to 2s for faster 
recovery
+                                       log.Printf("ForwardEcho: waiting 2 
seconds before dialing to ensure CertificateProvider is ready...")
+                                       time.Sleep(2 * time.Second)
+
+                                       dialCtx, dialCancel := 
context.WithTimeout(context.Background(), 60*time.Second)
+                                       newConn, dialErr := 
grpc.DialContext(dialCtx, req.Url, grpc.WithTransportCredentials(creds))
+                                       dialCancel()
+                                       if dialErr != nil {
+                                               log.Printf("ForwardEcho: failed 
to dial %s: %v", req.Url, dialErr)
+                                               // Continue to record error
+                                       } else {
+                                               s.connMutex.Lock()
+                                               s.connCache[req.Url] = 
&cachedConnection{
+                                                       conn:      newConn,
+                                                       createdAt: time.Now(),
+                                               }
+                                               conn = newConn
+                                               client = 
pb.NewEchoServiceClient(conn)
+                                               s.connMutex.Unlock()
+                                               log.Printf("ForwardEcho: 
connection recreated, retrying request %d", i+1)
+                                               // Retry this request with new 
connection
+                                               continue
+                                       }
+                               }
+                       }
+                       if i < count-1 {
+                               waitTime := 2 * time.Second
+                               log.Printf("ForwardEcho: waiting %v before next 
request...", waitTime)
+                               time.Sleep(waitTime)
+                       }
+                       continue
+               }
+
+               if resp == nil {
+                       log.Printf("ForwardEcho: request %d failed: response is 
nil", i+1)
+                       output = append(output, fmt.Sprintf("[%d] Error: 
response is nil", i))
+                       continue
+               }
+
+               log.Printf("ForwardEcho: request %d succeeded: Hostname=%s 
ServiceVersion=%s Namespace=%s IP=%s",
+                       i+1, resp.Hostname, resp.ServiceVersion, 
resp.Namespace, resp.Ip)
+
+               lineParts := []string{
+                       fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
+               }
+               if resp.ServiceVersion != "" {
+                       lineParts = append(lineParts, 
fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
+               }
+               if resp.Namespace != "" {
+                       lineParts = append(lineParts, 
fmt.Sprintf("Namespace=%s", resp.Namespace))
+               }
+               if resp.Ip != "" {
+                       lineParts = append(lineParts, fmt.Sprintf("IP=%s", 
resp.Ip))
+               }
+               if resp.Cluster != "" {
+                       lineParts = append(lineParts, fmt.Sprintf("Cluster=%s", 
resp.Cluster))
+               }
+               if resp.ServicePort > 0 {
+                       lineParts = append(lineParts, 
fmt.Sprintf("ServicePort=%d", resp.ServicePort))
+               }
+
+               output = append(output, strings.Join(lineParts, " "))
+
+               // Small delay between successful requests to avoid 
overwhelming the server
+               if i < count-1 {
+                       time.Sleep(100 * time.Millisecond)
+               }
+       }
+
+       log.Printf("ForwardEcho: completed %d requests", count)
+
+       // If all requests failed, add summary similar to grpcurl
+       if errorCount > 0 && errorCount == int(count) && firstError != "" {
+               summary := fmt.Sprintf("ERROR:\nCode: Unknown\nMessage: %d/%d 
requests had errors; first error: %s", errorCount, count, firstError)
+               // Prepend summary to output
+               output = append([]string{summary}, output...)
+       }
+
+       return &pb.ForwardEchoResponse{
+               Output: output,
+       }, nil
 }
diff --git a/tests/grpc-app/docker/dockerfile.consumer 
b/tests/grpc-app/docker/dockerfile.consumer
index 78aa4274..8c4def20 100644
--- a/tests/grpc-app/docker/dockerfile.consumer
+++ b/tests/grpc-app/docker/dockerfile.consumer
@@ -47,4 +47,7 @@ WORKDIR /app
 COPY --from=builder /build/grpc-consumer /usr/local/bin/grpc-consumer
 RUN chmod +x /usr/local/bin/grpc-consumer
 
+COPY ./grpcurl /usr/local/bin/grpcurl
+RUN chmod +x /usr/local/bin/grpcurl
+
 ENTRYPOINT ["/usr/local/bin/grpc-consumer"]
diff --git a/tests/grpc-app/producer/main.go b/tests/grpc-app/producer/main.go
index 8bb23199..52debbe5 100644
--- a/tests/grpc-app/producer/main.go
+++ b/tests/grpc-app/producer/main.go
@@ -19,7 +19,6 @@ package main
 
 import (
        "context"
-       "encoding/json"
        "flag"
        "fmt"
        "log"
@@ -27,29 +26,105 @@ import (
        "os"
        "os/signal"
        "regexp"
+       "strconv"
        "strings"
-       "sync"
        "syscall"
        "time"
 
        "google.golang.org/grpc"
-       "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/credentials/insecure"
        xdscreds "google.golang.org/grpc/credentials/xds"
        "google.golang.org/grpc/grpclog"
        "google.golang.org/grpc/reflection"
-       _ "google.golang.org/grpc/xds"
+       "google.golang.org/grpc/xds"
 
        pb "github.com/apache/dubbo-kubernetes/tests/grpc-app/proto"
 )
 
 var (
-       target     = flag.String("target", "", "Target service address with 
xds:/// scheme (optional)")
-       count      = flag.Int("count", 5, "Number of requests to send")
-       port       = flag.Int("port", 17171, "gRPC server port for ForwardEcho 
testing")
-       testServer *grpc.Server
+       port = flag.Int("port", 17070, "gRPC server port")
 )
 
+type echoServer struct {
+       pb.UnimplementedEchoServiceServer
+       pb.UnimplementedEchoTestServiceServer
+       hostname       string
+       serviceVersion string
+       namespace      string
+       instanceIP     string
+       cluster        string
+       servicePort    int
+}
+
+func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest) 
(*pb.EchoResponse, error) {
+       if req == nil {
+               return nil, fmt.Errorf("request is nil")
+       }
+       log.Printf("Received: %v", req.Message)
+       return &pb.EchoResponse{
+               Message:        req.Message,
+               Hostname:       s.hostname,
+               ServiceVersion: s.serviceVersion,
+               Namespace:      s.namespace,
+               Ip:             s.instanceIP,
+               Cluster:        s.cluster,
+               ServicePort:    int32(s.servicePort),
+       }, nil
+}
+
+func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream 
pb.EchoService_StreamEchoServer) error {
+       if req == nil {
+               return fmt.Errorf("request is nil")
+       }
+       if stream == nil {
+               return fmt.Errorf("stream is nil")
+       }
+       log.Printf("StreamEcho received: %v", req.Message)
+       for i := 0; i < 3; i++ {
+               if err := stream.Send(&pb.EchoResponse{
+                       Message:  fmt.Sprintf("%s [%d]", req.Message, i),
+                       Hostname: s.hostname,
+               }); err != nil {
+                       log.Printf("StreamEcho send error: %v", err)
+                       return err
+               }
+       }
+       return nil
+}
+
+func (s *echoServer) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+       if req == nil {
+               return nil, fmt.Errorf("request is nil")
+       }
+
+       count := req.Count
+       if count < 0 {
+               count = 0
+       }
+       if count > 100 {
+               count = 100
+       }
+
+       log.Printf("ForwardEcho called: url=%s, count=%d", req.Url, count)
+
+       output := make([]string, 0, count)
+       for i := int32(0); i < count; i++ {
+               line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s 
ServicePort=%d Namespace=%s",
+                       i, s.hostname, s.serviceVersion, s.servicePort, 
s.namespace)
+               if s.instanceIP != "" {
+                       line += fmt.Sprintf(" IP=%s", s.instanceIP)
+               }
+               if s.cluster != "" {
+                       line += fmt.Sprintf(" Cluster=%s", s.cluster)
+               }
+               output = append(output, line)
+       }
+
+       return &pb.ForwardEchoResponse{
+               Output: output,
+       }, nil
+}
+
 // grpcLogger filters out xDS informational logs that are incorrectly marked 
as ERROR
 type grpcLogger struct {
        logger *log.Logger
@@ -83,6 +158,7 @@ func cleanMessage(msg string) string {
 
 func (l *grpcLogger) Info(args ...interface{}) {
        msg := fmt.Sprint(args...)
+       // Filter out xDS "entering mode: SERVING" logs
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
@@ -125,9 +201,18 @@ func (l *grpcLogger) Warningf(format string, args 
...interface{}) {
 
 func (l *grpcLogger) Error(args ...interface{}) {
        msg := fmt.Sprint(args...)
+       // Filter out xDS "entering mode: SERVING" logs that are incorrectly 
marked as ERROR
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       // Filter out common connection reset errors - these are normal network 
behavior
+       // when clients disconnect before completing the HTTP/2 handshake
+       if strings.Contains(msg, "connection reset by peer") ||
+               strings.Contains(msg, "failed to receive the preface from 
client") ||
+               strings.Contains(msg, "connection error") {
+               // These are normal network events, log at DEBUG level instead 
of ERROR
+               return
+       }
        msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
@@ -137,6 +222,12 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       // Filter out common connection reset errors - these are normal network 
behavior
+       if strings.Contains(msg, "connection reset by peer") ||
+               strings.Contains(msg, "failed to receive the preface from 
client") ||
+               strings.Contains(msg, "connection error") {
+               return
+       }
        msg = cleanMessage(msg)
        l.logger.Print("ERROR: ", msg)
 }
@@ -146,6 +237,12 @@ func (l *grpcLogger) Errorf(format string, args 
...interface{}) {
        if strings.Contains(msg, "entering mode") && strings.Contains(msg, 
"SERVING") {
                return
        }
+       // Filter out common connection reset errors - these are normal network 
behavior
+       if strings.Contains(msg, "connection reset by peer") ||
+               strings.Contains(msg, "failed to receive the preface from 
client") ||
+               strings.Contains(msg, "connection error") {
+               return
+       }
        msg = cleanMessage(msg)
        l.logger.Printf("ERROR: %s", msg)
 }
@@ -166,398 +263,143 @@ func (l *grpcLogger) V(level int) bool {
        return level <= 0
 }
 
-func main() {
-       flag.Parse()
-
-       // Set custom gRPC logger to filter out xDS informational logs
-       // The "ERROR: [xds] Listener entering mode: SERVING" is actually an 
informational log
-       grpclog.SetLoggerV2(&grpcLogger{
-               logger: log.New(os.Stderr, "", log.LstdFlags),
-       })
-
-       go startTestServer(*port)
-
-       if *target != "" {
-               testDirectConnection(*target, *count)
-       }
-
-       log.Printf("Producer running. Test server listening on port %d for 
ForwardEcho", *port)
-
-       sigChan := make(chan os.Signal, 1)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
-       <-sigChan
-       log.Println("Shutting down...")
-
-       if testServer != nil {
-               log.Println("Stopping test server...")
-               testServer.GracefulStop()
-       }
-}
-
-func testDirectConnection(target string, count int) {
-       creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
-               FallbackCreds: insecure.NewCredentials(),
-       })
-       if err != nil {
-               log.Fatalf("Failed to create xDS client credentials: %v", err)
-       }
-
-       ctx := context.Background()
-       conn, err := grpc.DialContext(ctx, target, 
grpc.WithTransportCredentials(creds))
-       if err != nil {
-               log.Fatalf("Failed to connect: %v", err)
-       }
-       defer conn.Close()
-
-       client := pb.NewEchoServiceClient(conn)
+// waitForBootstrapFile waits for the grpc-bootstrap.json file to exist
+// This is necessary because the dubbo-proxy sidecar needs time to generate 
the file
+func waitForBootstrapFile(bootstrapPath string, maxWait time.Duration) error {
+       log.Printf("Waiting for bootstrap file to exist: %s (max wait: %v)", 
bootstrapPath, maxWait)
 
-       log.Printf("Connected to %s, sending %d requests...", target, count)
+       ctx, cancel := context.WithTimeout(context.Background(), maxWait)
+       defer cancel()
 
-       for i := 0; i < count; i++ {
-               req := &pb.EchoRequest{
-                       Message: fmt.Sprintf("Hello from producer [%d]", i+1),
-               }
-
-               reqCtx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
-               resp, err := client.Echo(reqCtx, req)
-               cancel()
+       ticker := time.NewTicker(500 * time.Millisecond)
+       defer ticker.Stop()
 
-               if err != nil {
-                       log.Printf("Request %d failed: %v", i+1, err)
-                       continue
+       startTime := time.Now()
+       for {
+               // Check if file exists and is not empty
+               if info, err := os.Stat(bootstrapPath); err == nil && 
info.Size() > 0 {
+                       log.Printf("Bootstrap file found after %v: %s", 
time.Since(startTime), bootstrapPath)
+                       return nil
                }
 
-               if resp == nil {
-                       log.Printf("Request %d failed: response is nil", i+1)
-                       continue
+               // Check for timeout
+               select {
+               case <-ctx.Done():
+                       return fmt.Errorf("timeout waiting for bootstrap file: 
%s (waited %v)", bootstrapPath, time.Since(startTime))
+               case <-ticker.C:
+                       // Continue waiting
                }
-
-               log.Printf("Request %d: Response=%s, Hostname=%s", i+1, 
resp.Message, resp.Hostname)
-               time.Sleep(500 * time.Millisecond)
        }
-
-       log.Println("All requests completed")
 }
 
-func startTestServer(port int) {
-       lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
-       if err != nil {
-               log.Printf("Failed to listen on port %d: %v", port, err)
-               return
-       }
-
-       testServer = grpc.NewServer()
-       pb.RegisterEchoTestServiceServer(testServer, &testServerImpl{
-               connCache: make(map[string]*grpc.ClientConn),
-       })
-       reflection.Register(testServer)
-
-       log.Printf("Test server listening on port %d for ForwardEcho 
(reflection enabled)", port)
-       if err := testServer.Serve(lis); err != nil {
-               log.Printf("Test server error: %v", err)
+func firstNonEmpty(values ...string) string {
+       for _, v := range values {
+               if strings.TrimSpace(v) != "" {
+                       return v
+               }
        }
+       return ""
 }
 
-type testServerImpl struct {
-       pb.UnimplementedEchoTestServiceServer
-       // Connection cache: map from URL to gRPC connection
-       connCache map[string]*grpc.ClientConn
-       connMutex sync.RWMutex
-}
+func main() {
+       flag.Parse()
 
-func (s *testServerImpl) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
-       if req == nil {
-               return nil, fmt.Errorf("request is nil")
-       }
+       // Set custom gRPC logger to filter out xDS informational logs
+       // The "ERROR: [xds] Listener entering mode: SERVING" is actually an 
informational log
+       grpclog.SetLoggerV2(&grpcLogger{
+               logger: log.New(os.Stderr, "", log.LstdFlags),
+       })
 
-       if req.Url == "" {
-               return nil, fmt.Errorf("url is required")
+       hostname, _ := os.Hostname()
+       if hostname == "" {
+               hostname = "unknown"
        }
 
-       count := req.Count
-       if count < 0 {
-               count = 0
+       namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"), 
os.Getenv("POD_NAMESPACE"), "default")
+       serviceVersion := firstNonEmpty(
+               os.Getenv("SERVICE_VERSION"),
+               os.Getenv("POD_VERSION"),
+               os.Getenv("VERSION"),
+       )
+       if serviceVersion == "" {
+               serviceVersion = "unknown"
        }
-       if count > 100 {
-               count = 100
+       cluster := os.Getenv("SERVICE_CLUSTER")
+       instanceIP := os.Getenv("INSTANCE_IP")
+       servicePort := *port
+       if sp := os.Getenv("SERVICE_PORT"); sp != "" {
+               if parsed, err := strconv.Atoi(sp); err == nil {
+                       servicePort = parsed
+               }
        }
 
-       log.Printf("ForwardEcho: url=%s, count=%d", req.Url, count)
-
-       // Check bootstrap configuration
+       // Get bootstrap file path from environment variable or use default
        bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
        if bootstrapPath == "" {
-               return nil, fmt.Errorf("GRPC_XDS_BOOTSTRAP environment variable 
is not set")
+               bootstrapPath = "/etc/dubbo/proxy/grpc-bootstrap.json"
+               log.Printf("GRPC_XDS_BOOTSTRAP not set, using default: %s", 
bootstrapPath)
        }
 
-       // Verify bootstrap file exists
-       if _, err := os.Stat(bootstrapPath); os.IsNotExist(err) {
-               return nil, fmt.Errorf("bootstrap file does not exist: %s", 
bootstrapPath)
+       // Wait for bootstrap file to exist before creating xDS server
+       // The dubbo-proxy sidecar needs time to generate this file
+       if err := waitForBootstrapFile(bootstrapPath, 60*time.Second); err != 
nil {
+               log.Fatalf("Failed to wait for bootstrap file: %v", err)
        }
 
-       // Read bootstrap file to verify UDS socket
-       bootstrapData, err := os.ReadFile(bootstrapPath)
+       // Create xDS-enabled gRPC server
+       // For proxyless gRPC, we use xds.NewGRPCServer() instead of 
grpc.NewServer()
+       // NOTE: FallbackCreds is REQUIRED by gRPC xDS library for initial 
connection
+       // before xDS configuration is available. However, once xDS configures 
TLS,
+       // the server will use TLS and will NOT fallback to plaintext if TLS 
fails.
+       // FallbackCreds is only used when xDS has not yet provided TLS 
configuration.
+       creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
+               FallbackCreds: insecure.NewCredentials(),
+       })
        if err != nil {
-               return nil, fmt.Errorf("failed to read bootstrap file: %v", err)
+               log.Fatalf("Failed to create xDS server credentials: %v", err)
        }
 
-       var bootstrapJSON map[string]interface{}
-       if err := json.Unmarshal(bootstrapData, &bootstrapJSON); err != nil {
-               return nil, fmt.Errorf("failed to parse bootstrap file: %v", 
err)
-       }
-
-       // Extract UDS socket path
-       var udsPath string
-       if xdsServers, ok := bootstrapJSON["xds_servers"].([]interface{}); ok 
&& len(xdsServers) > 0 {
-               if server, ok := xdsServers[0].(map[string]interface{}); ok {
-                       if serverURI, ok := server["server_uri"].(string); ok {
-                               if strings.HasPrefix(serverURI, "unix://") {
-                                       udsPath = strings.TrimPrefix(serverURI, 
"unix://")
-                                       if _, err := os.Stat(udsPath); 
os.IsNotExist(err) {
-                                               return nil, fmt.Errorf("UDS 
socket does not exist: %s", udsPath)
-                                       }
-                               }
-                       }
-               }
-       }
-
-       // CRITICAL: Reuse connections to avoid creating new xDS connections 
for each RPC call
-       // This prevents the RDS request loop issue and ensures stable 
connection state
-       s.connMutex.RLock()
-       conn, exists := s.connCache[req.Url]
-       s.connMutex.RUnlock()
-
-       // Check if cached connection is still valid (not closed/shutdown)
-       if exists && conn != nil {
-               state := conn.GetState()
-               if state == connectivity.Shutdown {
-                       // Connection is closed, remove from cache and create 
new one
-                       log.Printf("ForwardEcho: cached connection for %s is 
SHUTDOWN, removing from cache", req.Url)
-                       s.connMutex.Lock()
-                       delete(s.connCache, req.Url)
-                       conn = nil
-                       exists = false
-                       s.connMutex.Unlock()
-               }
+       server, err := xds.NewGRPCServer(grpc.Creds(creds))
+       if err != nil {
+               log.Fatalf("Failed to create xDS gRPC server: %v", err)
        }
 
-       if !exists || conn == nil {
-               // Create new connection
-               s.connMutex.Lock()
-               // Double-check after acquiring write lock
-               if conn, exists = s.connCache[req.Url]; !exists || conn == nil {
-                       // Create xDS client credentials
-                       creds, err := 
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
-                               FallbackCreds: insecure.NewCredentials(),
-                       })
-                       if err != nil {
-                               s.connMutex.Unlock()
-                               return nil, fmt.Errorf("failed to create xDS 
client credentials: %v", err)
-                       }
-
-                       // Dial with xDS URL - use background context, not the 
request context
-                       // The request context might timeout before xDS 
configuration is received
-                       log.Printf("ForwardEcho: creating new connection for 
%s...", req.Url)
-                       conn, err = grpc.DialContext(context.Background(), 
req.Url, grpc.WithTransportCredentials(creds))
-                       if err != nil {
-                               s.connMutex.Unlock()
-                               return nil, fmt.Errorf("failed to dial %s: %v", 
req.Url, err)
-                       }
-                       s.connCache[req.Url] = conn
-                       log.Printf("ForwardEcho: cached connection for %s", 
req.Url)
-               }
-               s.connMutex.Unlock()
-       } else {
-               log.Printf("ForwardEcho: reusing cached connection for %s 
(state: %v)", req.Url, conn.GetState())
+       es := &echoServer{
+               hostname:       hostname,
+               serviceVersion: serviceVersion,
+               namespace:      namespace,
+               instanceIP:     instanceIP,
+               cluster:        cluster,
+               servicePort:    servicePort,
        }
+       pb.RegisterEchoServiceServer(server, es)
+       pb.RegisterEchoTestServiceServer(server, es)
+       // Enable reflection API for grpcurl to discover services
+       reflection.Register(server)
 
-       initialState := conn.GetState()
-       log.Printf("ForwardEcho: initial connection state: %v", initialState)
-
-       // CRITICAL: If connection is already READY, use it directly without 
waiting
-       // For cached connections, they should already be in READY state
-       if initialState == connectivity.Ready {
-               log.Printf("ForwardEcho: connection is already READY, 
proceeding with RPC calls")
-       } else {
-               // Only wait for new connections or connections that are not 
READY
-               // For gRPC xDS proxyless, we need to wait for the client to 
receive and process LDS/CDS/EDS
-               // The connection state may transition: IDLE -> CONNECTING -> 
READY (or TRANSIENT_FAILURE -> CONNECTING -> READY)
-               log.Printf("ForwardEcho: waiting for xDS configuration to be 
processed and connection to be ready (30 seconds)...")
-
-               // Wait for state changes with multiple attempts
-               maxWait := 30 * time.Second
-
-               // Wait for state changes, allowing multiple state transitions
-               // CRITICAL: Don't exit on TRANSIENT_FAILURE - it may recover 
to READY
-               stateChanged := false
-               currentState := initialState
-               startTime := time.Now()
-               lastStateChangeTime := startTime
-
-               for time.Since(startTime) < maxWait {
-                       if currentState == connectivity.Ready {
-                               log.Printf("ForwardEcho: connection is READY 
after %v", time.Since(startTime))
-                               stateChanged = true
-                               break
-                       }
-
-                       // Only exit on Shutdown, not on TransientFailure (it 
may recover)
-                       if currentState == connectivity.Shutdown {
-                               log.Printf("ForwardEcho: connection in %v state 
after %v, cannot recover", currentState, time.Since(startTime))
-                               break
-                       }
-
-                       // Wait for state change with remaining timeout
-                       remaining := maxWait - time.Since(startTime)
-                       if remaining <= 0 {
-                               break
-                       }
-
-                       // Use shorter timeout for each WaitForStateChange call 
to allow periodic checks
-                       waitTimeout := remaining
-                       if waitTimeout > 5*time.Second {
-                               waitTimeout = 5 * time.Second
-                       }
-
-                       stateCtx, stateCancel := 
context.WithTimeout(context.Background(), waitTimeout)
-                       if conn.WaitForStateChange(stateCtx, currentState) {
-                               newState := conn.GetState()
-                               elapsed := time.Since(startTime)
-                               log.Printf("ForwardEcho: connection state 
changed from %v to %v after %v", currentState, newState, elapsed)
-                               stateChanged = true
-                               currentState = newState
-                               lastStateChangeTime = time.Now()
-
-                               // If READY, we're done
-                               if newState == connectivity.Ready {
-                                       stateCancel()
-                                       break
-                               }
-
-                               // If we're in TRANSIENT_FAILURE, continue 
waiting - it may recover
-                               // gRPC xDS client will retry connection when 
endpoints become available
-                               if newState == connectivity.TransientFailure {
-                                       log.Printf("ForwardEcho: connection in 
TRANSIENT_FAILURE, continuing to wait for recovery (remaining: %v)", 
maxWait-elapsed)
-                               }
-                       } else {
-                               // Timeout waiting for state change - check if 
we should continue
-                               elapsed := time.Since(startTime)
-                               if currentState == 
connectivity.TransientFailure {
-                                       // If we've been in TRANSIENT_FAILURE 
for a while, continue waiting
-                                       // The connection may recover when 
endpoints become available
-                                       if time.Since(lastStateChangeTime) < 
10*time.Second {
-                                               log.Printf("ForwardEcho: still 
in TRANSIENT_FAILURE after %v, continuing to wait (remaining: %v)", elapsed, 
maxWait-elapsed)
-                                       } else {
-                                               log.Printf("ForwardEcho: no 
state change after %v, current state: %v (remaining: %v)", elapsed, 
currentState, maxWait-elapsed)
-                                       }
-                               } else {
-                                       log.Printf("ForwardEcho: no state 
change after %v, current state: %v (remaining: %v)", elapsed, currentState, 
maxWait-elapsed)
-                               }
-                       }
-                       stateCancel()
-               }
-
-               finalState := conn.GetState()
-               log.Printf("ForwardEcho: final connection state: %v 
(stateChanged=%v, waited=%v)", finalState, stateChanged, time.Since(startTime))
-
-               // If connection is not READY, log a warning but proceed anyway
-               // The first RPC call may trigger connection establishment
-               if finalState != connectivity.Ready {
-                       log.Printf("ForwardEcho: WARNING - connection is not 
READY (state=%v), but proceeding with RPC calls", finalState)
-               }
+       lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
+       if err != nil {
+               log.Fatalf("Failed to listen: %v", err)
        }
 
-       // Create client and make RPC calls
-       client := pb.NewEchoServiceClient(conn)
-       output := make([]string, 0, count)
-
-       log.Printf("ForwardEcho: sending %d requests...", count)
-       for i := int32(0); i < count; i++ {
-               echoReq := &pb.EchoRequest{
-                       Message: fmt.Sprintf("Request %d", i+1),
-               }
-
-               currentState := conn.GetState()
-               log.Printf("ForwardEcho: sending request %d (connection state: 
%v)...", i+1, currentState)
-
-               // Use longer timeout for first request to allow connection 
establishment
-               // For subsequent requests, use shorter timeout but still allow 
for retries
-               timeout := 30 * time.Second
-               if i > 0 {
-                       timeout = 20 * time.Second
-               }
-
-               reqCtx, reqCancel := context.WithTimeout(context.Background(), 
timeout)
-               reqStartTime := time.Now()
-               resp, err := client.Echo(reqCtx, echoReq)
-               duration := time.Since(reqStartTime)
-               reqCancel()
-
-               // Check connection state after RPC call
-               stateAfterRPC := conn.GetState()
-               log.Printf("ForwardEcho: request %d completed in %v, connection 
state: %v (was %v)", i+1, duration, stateAfterRPC, currentState)
-
-               if err != nil {
-                       log.Printf("ForwardEcho: request %d failed: %v", i+1, 
err)
-                       output = append(output, fmt.Sprintf("[%d] Error: %v", 
i, err))
-
-                       // If connection is in TRANSIENT_FAILURE, wait a bit 
before next request
-                       // to allow gRPC client to retry and recover
-                       if stateAfterRPC == connectivity.TransientFailure && i 
< count-1 {
-                               waitTime := 2 * time.Second
-                               log.Printf("ForwardEcho: connection in 
TRANSIENT_FAILURE, waiting %v before next request...", waitTime)
-                               time.Sleep(waitTime)
-
-                               // Check if connection recovered
-                               newState := conn.GetState()
-                               if newState == connectivity.Ready {
-                                       log.Printf("ForwardEcho: connection 
recovered to READY after wait")
-                               } else {
-                                       log.Printf("ForwardEcho: connection 
state after wait: %v", newState)
-                               }
-                       }
-                       continue
-               }
-
-               if resp == nil {
-                       log.Printf("ForwardEcho: request %d failed: response is 
nil", i+1)
-                       output = append(output, fmt.Sprintf("[%d] Error: 
response is nil", i))
-                       continue
-               }
-
-               log.Printf("ForwardEcho: request %d succeeded: Hostname=%s 
ServiceVersion=%s Namespace=%s IP=%s",
-                       i+1, resp.Hostname, resp.ServiceVersion, 
resp.Namespace, resp.Ip)
-
-               lineParts := []string{
-                       fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
-               }
-               if resp.ServiceVersion != "" {
-                       lineParts = append(lineParts, 
fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
-               }
-               if resp.Namespace != "" {
-                       lineParts = append(lineParts, 
fmt.Sprintf("Namespace=%s", resp.Namespace))
-               }
-               if resp.Ip != "" {
-                       lineParts = append(lineParts, fmt.Sprintf("IP=%s", 
resp.Ip))
-               }
-               if resp.Cluster != "" {
-                       lineParts = append(lineParts, fmt.Sprintf("Cluster=%s", 
resp.Cluster))
-               }
-               if resp.ServicePort > 0 {
-                       lineParts = append(lineParts, 
fmt.Sprintf("ServicePort=%d", resp.ServicePort))
-               }
-
-               output = append(output, strings.Join(lineParts, " "))
-
-               // Small delay between successful requests to avoid 
overwhelming the server
-               if i < count-1 {
-                       time.Sleep(100 * time.Millisecond)
+       log.Printf("Starting gRPC proxyless server on port %d (hostname: %s)", 
*port, hostname)
+
+       go func() {
+               sigChan := make(chan os.Signal, 1)
+               signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+               <-sigChan
+               log.Println("Shutting down server...")
+               server.GracefulStop()
+       }()
+
+       // Serve the gRPC server
+       // Note: server.Serve returns when the listener is closed, which is 
normal during shutdown
+       // Connection reset errors are handled by the gRPC library and logged 
separately
+       if err := server.Serve(lis); err != nil {
+               // Only log as fatal if it's not a normal shutdown (listener 
closed)
+               if !strings.Contains(err.Error(), "use of closed network 
connection") {
+                       log.Fatalf("Failed to serve: %v", err)
                }
+               log.Printf("Server stopped: %v", err)
        }
-
-       log.Printf("ForwardEcho: completed %d requests", count)
-
-       return &pb.ForwardEchoResponse{
-               Output: output,
-       }, nil
 }

Reply via email to