This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 1b7b4698 Update grpc securty and traffic sample (#825)
1b7b4698 is described below
commit 1b7b469894c118b62fc0953d97cec4041affed2f
Author: mfordjody <[email protected]>
AuthorDate: Sat Nov 22 21:36:43 2025 +0800
Update grpc securty and traffic sample (#825)
---
dubbod/planet/pkg/bootstrap/server.go | 9 +-
dubbod/planet/pkg/model/authentication.go | 98 ++-
dubbod/planet/pkg/model/push_context.go | 157 ++++-
dubbod/planet/pkg/model/subsetrule.go | 29 +-
dubbod/planet/pkg/networking/grpcgen/cds.go | 223 +++++-
dubbod/planet/pkg/networking/grpcgen/grpcgen.go | 33 +
dubbod/planet/pkg/networking/grpcgen/lds.go | 76 ++-
dubbod/planet/pkg/xds/cds.go | 12 +
manifests/charts/base/files/crd-all.yaml | 8 +-
.../dubbo-discovery/files/grpc-agent.yaml | 9 +
pkg/dubbo-agent/agent.go | 73 +-
samples/grpc-app/README.md | 51 +-
samples/grpc-app/grpc-app.yaml | 65 +-
tests/grpc-app/README.md | 4 +-
tests/grpc-app/consumer/main.go | 753 +++++++++++++++------
tests/grpc-app/docker/dockerfile.consumer | 3 +
tests/grpc-app/producer/main.go | 574 ++++++----------
17 files changed, 1476 insertions(+), 701 deletions(-)
diff --git a/dubbod/planet/pkg/bootstrap/server.go
b/dubbod/planet/pkg/bootstrap/server.go
index aa363d4f..d567726f 100644
--- a/dubbod/planet/pkg/bootstrap/server.go
+++ b/dubbod/planet/pkg/bootstrap/server.go
@@ -482,10 +482,11 @@ func (s *Server) initRegistryEventHandlers() {
// Log the config change
log.Infof("configHandler: %s event for %s/%s/%s", event,
configKey.Kind, configKey.Namespace, configKey.Name)
- // CRITICAL: For SubsetRule and ServiceRoute changes, we need
Full push to ensure
- // PushContext is re-initialized and configuration is reloaded
- // This is because these configs affect CDS/RDS generation and
need complete context refresh
- needsFullPush := configKind == kind.SubsetRule || configKind ==
kind.ServiceRoute
+ // CRITICAL: Some configs
(SubsetRule/ServiceRoute/PeerAuthentication) require Full push to ensure
+ // PushContext is re-initialized and configuration is reloaded.
+ // PeerAuthentication must rebuild AuthenticationPolicies to
enable STRICT mTLS on LDS; without
+ // a full push the cached PushContext would continue serving
plaintext listeners.
+ needsFullPush := configKind == kind.SubsetRule || configKind ==
kind.ServiceRoute || configKind == kind.PeerAuthentication
// Trigger ConfigUpdate to push changes to all connected proxies
s.XDSServer.ConfigUpdate(&model.PushRequest{
diff --git a/dubbod/planet/pkg/model/authentication.go
b/dubbod/planet/pkg/model/authentication.go
index c0deaaea..5fa8b7dc 100644
--- a/dubbod/planet/pkg/model/authentication.go
+++ b/dubbod/planet/pkg/model/authentication.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
"istio.io/api/security/v1beta1"
+ typev1beta1 "istio.io/api/type/v1beta1"
)
type MutualTLSMode int
@@ -75,7 +76,8 @@ func (policy *AuthenticationPolicies)
addPeerAuthentication(configs []config.Con
for _, config := range configs {
versions = append(versions,
config.UID+"."+config.ResourceVersion)
spec := config.Spec.(*v1beta1.PeerAuthentication)
- if spec.Selector == nil || len(spec.Selector.MatchLabels) == 0 {
+ selector := spec.GetSelector()
+ if selector == nil || len(selector.MatchLabels) == 0 {
if t, ok :=
seenNamespaceOrMeshConfig[config.Namespace]; ok {
log.Warnf(
"Namespace/mesh-level
PeerAuthentication is already defined for %q at time %v. Ignore %q which was
created at time %v",
@@ -132,3 +134,97 @@ func ConvertToMutualTLSMode(mode
v1beta1.PeerAuthentication_MutualTLS_Mode) Mutu
return MTLSUnknown
}
}
+
+func (policy *AuthenticationPolicies) EffectiveMutualTLSMode(namespace string,
workloadLabels map[string]string, port uint32) MutualTLSMode {
+ if policy == nil {
+ return MTLSUnknown
+ }
+
+ if namespace != "" {
+ if mode := policy.matchingPeerAuthentication(namespace,
workloadLabels, port); mode != MTLSUnknown {
+ return mode
+ }
+ if mode, ok := policy.namespaceMutualTLSMode[namespace]; ok &&
mode != MTLSUnknown {
+ return mode
+ }
+ }
+
+ if policy.rootNamespace != "" {
+ if mode :=
policy.matchingPeerAuthentication(policy.rootNamespace, workloadLabels, port);
mode != MTLSUnknown {
+ return mode
+ }
+ }
+
+ if policy.globalMutualTLSMode != MTLSUnknown {
+ return policy.globalMutualTLSMode
+ }
+
+ return MTLSUnknown
+}
+
+func (policy *AuthenticationPolicies) matchingPeerAuthentication(namespace
string, workloadLabels map[string]string, port uint32) MutualTLSMode {
+ configs := policy.peerAuthentications[namespace]
+ if len(configs) == 0 {
+ return MTLSUnknown
+ }
+
+ for _, cfg := range configs {
+ spec := cfg.Spec.(*v1beta1.PeerAuthentication)
+ if hasPeerAuthSelector(spec) &&
selectorMatchesWorkload(spec.GetSelector(), workloadLabels) {
+ if mode := peerAuthenticationModeForPort(spec, port);
mode != MTLSUnknown {
+ return mode
+ }
+ }
+ }
+
+ for _, cfg := range configs {
+ spec := cfg.Spec.(*v1beta1.PeerAuthentication)
+ if !hasPeerAuthSelector(spec) {
+ if mode := peerAuthenticationModeForPort(spec, port);
mode != MTLSUnknown {
+ return mode
+ }
+ }
+ }
+
+ return MTLSUnknown
+}
+
+func hasPeerAuthSelector(spec *v1beta1.PeerAuthentication) bool {
+ if spec == nil {
+ return false
+ }
+ selector := spec.GetSelector()
+ return selector != nil && len(selector.MatchLabels) > 0
+}
+
+func selectorMatchesWorkload(selector *typev1beta1.WorkloadSelector,
workloadLabels map[string]string) bool {
+ if selector == nil || len(selector.MatchLabels) == 0 {
+ return true
+ }
+ if len(workloadLabels) == 0 {
+ return false
+ }
+ for k, v := range selector.MatchLabels {
+ if workloadLabels[k] != v {
+ return false
+ }
+ }
+ return true
+}
+
+func peerAuthenticationModeForPort(spec *v1beta1.PeerAuthentication, port
uint32) MutualTLSMode {
+ if spec == nil {
+ return MTLSUnknown
+ }
+ if port != 0 && spec.PortLevelMtls != nil {
+ if mtls, ok := spec.PortLevelMtls[port]; ok && mtls != nil {
+ if mtls.Mode !=
v1beta1.PeerAuthentication_MutualTLS_UNSET {
+ return ConvertToMutualTLSMode(mtls.Mode)
+ }
+ }
+ }
+ if spec.Mtls != nil && spec.Mtls.Mode !=
v1beta1.PeerAuthentication_MutualTLS_UNSET {
+ return ConvertToMutualTLSMode(spec.Mtls.Mode)
+ }
+ return MTLSUnknown
+}
diff --git a/dubbod/planet/pkg/model/push_context.go
b/dubbod/planet/pkg/model/push_context.go
index 7990cd3b..572f035b 100644
--- a/dubbod/planet/pkg/model/push_context.go
+++ b/dubbod/planet/pkg/model/push_context.go
@@ -60,20 +60,21 @@ var (
)
type PushContext struct {
- Mesh *meshconfig.MeshConfig `json:"-"`
- initializeMutex sync.Mutex
- InitDone atomic.Bool
- Networks *meshconfig.MeshNetworks
- networkMgr *NetworkManager
- clusterLocalHosts ClusterLocalHosts
- exportToDefaults exportToDefaults
- ServiceIndex serviceIndex
- serviceRouteIndex serviceRouteIndex
- subsetRuleIndex subsetRuleIndex
- serviceAccounts map[serviceAccountKey][]string
- PushVersion string
- ProxyStatus map[string]map[string]ProxyPushStatus
- proxyStatusMutex sync.RWMutex
+ Mesh *meshconfig.MeshConfig `json:"-"`
+ initializeMutex sync.Mutex
+ InitDone atomic.Bool
+ Networks *meshconfig.MeshNetworks
+ networkMgr *NetworkManager
+ clusterLocalHosts ClusterLocalHosts
+ exportToDefaults exportToDefaults
+ ServiceIndex serviceIndex
+ serviceRouteIndex serviceRouteIndex
+ subsetRuleIndex subsetRuleIndex
+ serviceAccounts map[serviceAccountKey][]string
+ AuthenticationPolicies *AuthenticationPolicies
+ PushVersion string
+ ProxyStatus map[string]map[string]ProxyPushStatus
+ proxyStatusMutex sync.RWMutex
}
type PushRequest struct {
@@ -515,6 +516,7 @@ func (ps *PushContext) createNewContext(env *Environment) {
ps.initServiceRegistry(env, nil)
ps.initServiceRoutes(env)
ps.initSubsetRules(env)
+ ps.initAuthenticationPolicies(env)
}
func (ps *PushContext) updateContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
@@ -606,6 +608,41 @@ func (ps *PushContext) updateContext(env *Environment,
oldPushContext *PushConte
ps.subsetRuleIndex = oldPushContext.subsetRuleIndex
}
+ authnPoliciesChanged := pushReq != nil && (pushReq.Full ||
HasConfigsOfKind(pushReq.ConfigsUpdated, kind.PeerAuthentication))
+ if authnPoliciesChanged || oldPushContext == nil ||
oldPushContext.AuthenticationPolicies == nil {
+ log.Infof("updateContext: PeerAuthentication changed (full=%v,
configsUpdatedContainingPeerAuth=%v), rebuilding authentication policies",
+ pushReq != nil && pushReq.Full, func() bool {
+ if pushReq == nil {
+ return false
+ }
+ return HasConfigsOfKind(pushReq.ConfigsUpdated,
kind.PeerAuthentication)
+ }())
+ ps.initAuthenticationPolicies(env)
+ } else {
+ ps.AuthenticationPolicies =
oldPushContext.AuthenticationPolicies
+ }
+}
+
+func (ps *PushContext) initAuthenticationPolicies(env *Environment) {
+ if env == nil {
+ ps.AuthenticationPolicies = nil
+ return
+ }
+ ps.AuthenticationPolicies = initAuthenticationPolicies(env)
+}
+
+func (ps *PushContext) InboundMTLSModeForProxy(proxy *Proxy, port uint32)
MutualTLSMode {
+ if ps == nil || proxy == nil || ps.AuthenticationPolicies == nil {
+ return MTLSUnknown
+ }
+ var namespace string
+ if proxy.Metadata != nil {
+ namespace = proxy.Metadata.Namespace
+ }
+ if namespace == "" {
+ namespace = proxy.ConfigNamespace
+ }
+ return ps.AuthenticationPolicies.EffectiveMutualTLSMode(namespace, nil,
port)
}
func (ps *PushContext) ServiceForHostname(proxy *Proxy, hostname host.Name)
*Service {
@@ -776,13 +813,22 @@ func (ps *PushContext) setSubsetRules(configs
[]config.Config) {
log.Infof("setSubsetRules: indexed %d namespaces with local rules",
len(namespaceLocalSubRules))
for ns, rules := range namespaceLocalSubRules {
totalRules := 0
- for _, ruleList := range rules.specificSubRules {
+ for hostname, ruleList := range rules.specificSubRules {
totalRules += len(ruleList)
+ // Log TLS configuration for each merged DestinationRule
+ for _, rule := range ruleList {
+ if dr, ok :=
rule.rule.Spec.(*networking.DestinationRule); ok {
+ hasTLS := dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ tlsMode := "none"
+ if hasTLS {
+ tlsMode =
dr.TrafficPolicy.Tls.Mode.String()
+ }
+ log.Infof("setSubsetRules: namespace
%s, hostname %s: DestinationRule has %d subsets, TLS mode: %s",
+ ns, hostname, len(dr.Subsets),
tlsMode)
+ }
+ }
}
log.Infof("setSubsetRules: namespace %s has %d DestinationRules
with %d specific hostnames", ns, totalRules, len(rules.specificSubRules))
- for hostname := range rules.specificSubRules {
- log.Debugf("setSubsetRules: namespace %s has rules for
hostname %s", ns, hostname)
- }
}
log.Infof("setSubsetRules: indexed %d namespaces with exported rules",
len(exportedDestRulesByNamespace))
if rootNamespaceLocalDestRules != nil {
@@ -804,8 +850,12 @@ func (ps *PushContext) initSubsetRules(env *Environment) {
for i := range subRules {
subRules[i] = configs[i]
if dr, ok := configs[i].Spec.(*networking.DestinationRule); ok {
- log.Infof("initSubsetRules: SubsetRule %s/%s for host
%s with %d subsets",
- configs[i].Namespace, configs[i].Name, dr.Host,
len(dr.Subsets))
+ tlsMode := "none"
+ if dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls !=
nil {
+ tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+ }
+ log.Infof("initSubsetRules: SubsetRule %s/%s for host
%s with %d subsets, TLS mode: %s",
+ configs[i].Namespace, configs[i].Name, dr.Host,
len(dr.Subsets), tlsMode)
}
}
@@ -863,7 +913,13 @@ func (ps *PushContext) DestinationRuleForService(namespace
string, hostname host
if nsRules := ps.subsetRuleIndex.namespaceLocal[namespace]; nsRules !=
nil {
log.Debugf("DestinationRuleForService: checking namespace-local
rules for %s (found %d specific rules)", namespace,
len(nsRules.specificSubRules))
if dr := firstDestinationRule(nsRules, hostname); dr != nil {
- log.Infof("DestinationRuleForService: found
DestinationRule in namespace-local index for %s/%s with %d subsets", namespace,
hostname, len(dr.Subsets))
+ hasTLS := dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ tlsMode := "none"
+ if hasTLS {
+ tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+ }
+ log.Infof("DestinationRuleForService: found
DestinationRule in namespace-local index for %s/%s with %d subsets (has
TrafficPolicy: %v, has TLS: %v, TLS mode: %s)",
+ namespace, hostname, len(dr.Subsets),
dr.TrafficPolicy != nil, hasTLS, tlsMode)
return dr
}
} else {
@@ -874,7 +930,13 @@ func (ps *PushContext) DestinationRuleForService(namespace
string, hostname host
log.Debugf("DestinationRuleForService: checking exported rules (found
%d exported namespaces)", len(ps.subsetRuleIndex.exportedByNamespace))
for ns, exported := range ps.subsetRuleIndex.exportedByNamespace {
if dr := firstDestinationRule(exported, hostname); dr != nil {
- log.Infof("DestinationRuleForService: found
DestinationRule in exported rules from namespace %s for %s/%s with %d subsets",
ns, namespace, hostname, len(dr.Subsets))
+ hasTLS := dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ tlsMode := "none"
+ if hasTLS {
+ tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+ }
+ log.Infof("DestinationRuleForService: found
DestinationRule in exported rules from namespace %s for %s/%s with %d subsets
(has TrafficPolicy: %v, has TLS: %v, TLS mode: %s)",
+ ns, namespace, hostname, len(dr.Subsets),
dr.TrafficPolicy != nil, hasTLS, tlsMode)
return dr
}
}
@@ -883,7 +945,13 @@ func (ps *PushContext) DestinationRuleForService(namespace
string, hostname host
if rootRules := ps.subsetRuleIndex.rootNamespaceLocal; rootRules != nil
{
log.Debugf("DestinationRuleForService: checking root namespace
rules (found %d specific rules)", len(rootRules.specificSubRules))
if dr := firstDestinationRule(rootRules, hostname); dr != nil {
- log.Infof("DestinationRuleForService: found
DestinationRule in root namespace for %s/%s with %d subsets", namespace,
hostname, len(dr.Subsets))
+ hasTLS := dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ tlsMode := "none"
+ if hasTLS {
+ tlsMode = dr.TrafficPolicy.Tls.Mode.String()
+ }
+ log.Infof("DestinationRuleForService: found
DestinationRule in root namespace for %s/%s with %d subsets (has TrafficPolicy:
%v, has TLS: %v, TLS mode: %s)",
+ namespace, hostname, len(dr.Subsets),
dr.TrafficPolicy != nil, hasTLS, tlsMode)
return dr
}
}
@@ -915,12 +983,45 @@ func firstDestinationRule(csr *consolidatedSubRules,
hostname host.Name) *networ
return nil
}
if rules := csr.specificSubRules[hostname]; len(rules) > 0 {
- log.Debugf("firstDestinationRule: found %d rules for hostname
%s", len(rules), hostname)
- if dr, ok := rules[0].rule.Spec.(*networking.DestinationRule);
ok {
- log.Debugf("firstDestinationRule: successfully cast to
DestinationRule for hostname %s", hostname)
- return dr
+ log.Infof("firstDestinationRule: found %d rules for hostname
%s", len(rules), hostname)
+ // CRITICAL: According to Istio behavior, multiple
DestinationRules should be merged into one.
+ // The first rule should contain the merged result if merge was
successful.
+ // However, if merge failed (e.g.,
EnableEnhancedSubsetRuleMerge is disabled),
+ // we need to check all rules and prefer the one with TLS
configuration.
+ // This ensures that when multiple SubsetRules exist (e.g., one
with subsets, one with TLS),
+ // we return the one that has TLS if available, or the first
one otherwise.
+ var bestRule *networking.DestinationRule
+ var bestRuleHasTLS bool
+ for i, rule := range rules {
+ if dr, ok :=
rule.rule.Spec.(*networking.DestinationRule); ok {
+ hasTLS := dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ if hasTLS {
+ tlsMode := dr.TrafficPolicy.Tls.Mode
+ tlsModeStr :=
dr.TrafficPolicy.Tls.Mode.String()
+ hasTLS = (tlsMode ==
networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
+ }
+ if i == 0 {
+ // Always use first rule as fallback
+ bestRule = dr
+ bestRuleHasTLS = hasTLS
+ } else if hasTLS && !bestRuleHasTLS {
+ // Prefer rule with TLS over rule
without TLS
+ log.Infof("firstDestinationRule: found
rule %d with TLS for hostname %s, preferring it over rule 0", i, hostname)
+ bestRule = dr
+ bestRuleHasTLS = hasTLS
+ }
+ }
+ }
+ if bestRule != nil {
+ tlsMode := "none"
+ if bestRuleHasTLS {
+ tlsMode =
bestRule.TrafficPolicy.Tls.Mode.String()
+ }
+ log.Infof("firstDestinationRule: returning
DestinationRule for hostname %s (has TrafficPolicy: %v, has TLS: %v, TLS mode:
%s, has %d subsets)",
+ hostname, bestRule.TrafficPolicy != nil,
bestRuleHasTLS, tlsMode, len(bestRule.Subsets))
+ return bestRule
} else {
- log.Warnf("firstDestinationRule: failed to cast rule to
DestinationRule for hostname %s", hostname)
+ log.Warnf("firstDestinationRule: failed to cast any
rule to DestinationRule for hostname %s", hostname)
}
} else {
log.Debugf("firstDestinationRule: no specific rules found for
hostname %s (available hostnames: %v)", hostname, func() []string {
diff --git a/dubbod/planet/pkg/model/subsetrule.go
b/dubbod/planet/pkg/model/subsetrule.go
index 954b8cde..1b829318 100644
--- a/dubbod/planet/pkg/model/subsetrule.go
+++ b/dubbod/planet/pkg/model/subsetrule.go
@@ -41,6 +41,7 @@ func (ps *PushContext) mergeSubsetRule(p
*consolidatedSubRules, subRuleConfig co
}
if mdrList, exists := subRules[resolvedHost]; exists {
+ log.Infof("mergeSubsetRule: found existing rules for host %s
(count: %d)", resolvedHost, len(mdrList))
// `appendSeparately` determines if the incoming destination
rule would become a new unique entry in the processedDestRules list.
appendSeparately := true
for _, mdr := range mdrList {
@@ -72,6 +73,8 @@ func (ps *PushContext) mergeSubsetRule(p
*consolidatedSubRules, subRuleConfig co
// at the same time added as a unique entry in the
processedDestRules.
if bothWithoutSelector || (bothWithSelector &&
selectorsMatch) {
appendSeparately = false
+ log.Debugf("mergeSubsetRule: will merge rules
for host %s (bothWithoutSelector: %v, bothWithSelector: %v, selectorsMatch:
%v)",
+ resolvedHost, bothWithoutSelector,
bothWithSelector, selectorsMatch)
}
// Deep copy destination rule, to prevent mutate it
later when merge with a new one.
@@ -96,10 +99,28 @@ func (ps *PushContext) mergeSubsetRule(p
*consolidatedSubRules, subRuleConfig co
}
}
- // If there is no top level policy and the incoming
rule has top level
- // traffic policy, use the one from the incoming rule.
- if mergedRule.TrafficPolicy == nil &&
rule.TrafficPolicy != nil {
- mergedRule.TrafficPolicy = rule.TrafficPolicy
+ // Merge top-level traffic policy. Historically we only
copied the first non-nil policy,
+ // which meant a later SubsetRule that supplied TLS
settings was ignored once a prior
+ // rule (e.g. subsets only) existed. To match Istio's
behavior and ensure Proxyless gRPC
+ // can enable mTLS after subsets are defined, allow the
incoming rule to override the TLS
+ // portion even when a Common TrafficPolicy already
exists.
+ if rule.TrafficPolicy != nil {
+ if mergedRule.TrafficPolicy == nil {
+ // First rule with TrafficPolicy, copy
it entirely
+ mergedRule.TrafficPolicy =
rule.TrafficPolicy
+ log.Infof("mergeSubsetRule: copied
TrafficPolicy from new rule to merged rule for host %s (has TLS: %v)",
+ resolvedHost,
rule.TrafficPolicy.Tls != nil)
+ } else {
+ // Merge TrafficPolicy fields, with TLS
settings from the latest rule taking precedence
+ if rule.TrafficPolicy.Tls != nil {
+ // CRITICAL: TLS settings from
the latest rule always win (ISTIO_MUTUAL/DUBBO_MUTUAL)
+ mergedRule.TrafficPolicy.Tls =
rule.TrafficPolicy.Tls
+ log.Infof("mergeSubsetRule:
updated TLS settings in merged TrafficPolicy for host %s (mode: %v)",
+ resolvedHost,
rule.TrafficPolicy.Tls.Mode)
+ }
+ // Merge other TrafficPolicy fields if
needed (loadBalancer, connectionPool, etc.)
+ // For now, we only merge TLS as it's
the critical setting for mTLS
+ }
}
}
if appendSeparately {
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go
b/dubbod/planet/pkg/networking/grpcgen/cds.go
index 3efff2bb..835c27a5 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
import (
"fmt"
+ "strings"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -29,6 +30,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ tlsv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
+ networking "istio.io/api/networking/v1alpha3"
)
type clusterBuilder struct {
@@ -115,7 +118,29 @@ func newClusterBuilder(node *model.Proxy, push
*model.PushContext, defaultCluste
func (b *clusterBuilder) build() []*cluster.Cluster {
var defaultCluster *cluster.Cluster
defaultRequested := b.filter == nil ||
b.filter.Contains(b.defaultClusterName)
- if defaultRequested {
+
+ // CRITICAL: Check if DestinationRule has TLS configuration before
generating default cluster
+ // According to Istio's proxyless gRPC implementation:
+ // - DestinationRule with ISTIO_MUTUAL configures CLIENT-SIDE
(outbound) mTLS
+ // - If DestinationRule specifies ISTIO_MUTUAL, we MUST generate
default cluster and apply TLS
+ // even if it's not explicitly requested, so that clients can use
mTLS when connecting
+ // Reference:
https://istio.io/latest/blog/2021/proxyless-grpc/#enabling-mtls
+ var dr *networking.DestinationRule
+ if b.svc != nil {
+ dr =
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.hostname)
+ if dr == nil && b.svc.Hostname != b.hostname {
+ dr =
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
+ }
+ }
+ hasTLSInDR := dr != nil && dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ if hasTLSInDR {
+ tlsMode := dr.TrafficPolicy.Tls.Mode
+ tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
+ hasTLSInDR = (tlsMode ==
networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
+ }
+
+ // Generate default cluster if requested OR if DestinationRule has
ISTIO_MUTUAL TLS
+ if defaultRequested || hasTLSInDR {
defaultCluster = b.edsCluster(b.defaultClusterName)
// CRITICAL: For gRPC proxyless, we need to set CommonLbConfig
to handle endpoint health status
// Following Istio's implementation, we should include
UNHEALTHY and DRAINING endpoints
@@ -138,10 +163,20 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
core.HealthStatus_DEGRADED,
},
}
- log.Infof("clusterBuilder.build: generated default cluster %s",
b.defaultClusterName)
+ // TLS will be applied in applyDestinationRule after
DestinationRule is found
+ if hasTLSInDR {
+ log.Infof("clusterBuilder.build: generated default
cluster %s (required for ISTIO_MUTUAL TLS)", b.defaultClusterName)
+ } else {
+ log.Infof("clusterBuilder.build: generated default
cluster %s", b.defaultClusterName)
+ }
}
- subsetClusters := b.applyDestinationRule(defaultCluster)
+ subsetClusters, newDefaultCluster :=
b.applyDestinationRule(defaultCluster)
+ // If applyDestinationRule generated a new default cluster (because TLS
was found but cluster wasn't generated in build()),
+ // use it instead of the original defaultCluster
+ if newDefaultCluster != nil {
+ defaultCluster = newDefaultCluster
+ }
out := make([]*cluster.Cluster, 0, 1+len(subsetClusters))
if defaultCluster != nil {
out = append(out, defaultCluster)
@@ -171,25 +206,84 @@ func (b *clusterBuilder) edsCluster(name string)
*cluster.Cluster {
}
}
-func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster)
(subsetClusters []*cluster.Cluster) {
+func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster)
(subsetClusters []*cluster.Cluster, newDefaultCluster *cluster.Cluster) {
if b.svc == nil || b.port == nil {
log.Warnf("applyDestinationRule: service or port is nil for
%s", b.defaultClusterName)
- return nil
+ return nil, nil
}
log.Infof("applyDestinationRule: looking for DestinationRule for
service %s/%s (hostname=%s, port=%d)",
b.svc.Attributes.Namespace, b.svc.Attributes.Name, b.hostname,
b.portNum)
dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace,
b.hostname)
if dr == nil {
- log.Warnf("applyDestinationRule: no DestinationRule found for
%s/%s", b.svc.Attributes.Namespace, b.hostname)
- return nil
+ // If not found with b.hostname, try with the service's FQDN
hostname
+ if b.svc.Hostname != b.hostname {
+ dr =
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
+ }
+ if dr == nil {
+ log.Warnf("applyDestinationRule: no DestinationRule
found for %s/%s or %s", b.svc.Attributes.Namespace, b.hostname, b.svc.Hostname)
+ return nil, nil
+ }
}
- if len(dr.Subsets) == 0 {
- log.Warnf("applyDestinationRule: DestinationRule found for
%s/%s but has no subsets", b.svc.Attributes.Namespace, b.hostname)
- return nil
+
+ // Check if DestinationRule has TLS configuration
+ hasTLS := dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls != nil
+ if hasTLS {
+ tlsMode := dr.TrafficPolicy.Tls.Mode
+ tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
+ hasTLS = (tlsMode == networking.ClientTLSSettings_ISTIO_MUTUAL
|| tlsModeStr == "DUBBO_MUTUAL")
+ }
+
+ // If no subsets and no TLS, there's nothing to do
+ if len(dr.Subsets) == 0 && !hasTLS {
+ log.Warnf("applyDestinationRule: DestinationRule found for
%s/%s but has no subsets and no TLS policy", b.svc.Attributes.Namespace,
b.hostname)
+ return nil, nil
}
- log.Infof("applyDestinationRule: found DestinationRule for %s/%s with
%d subsets, defaultCluster requested=%v",
- b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets),
defaultCluster != nil)
+ log.Infof("applyDestinationRule: found DestinationRule for %s/%s with
%d subsets, defaultCluster requested=%v, hasTLS=%v",
+ b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets),
defaultCluster != nil, hasTLS)
+
+ // CRITICAL: Apply TLS to default cluster if it exists and doesn't have
TransportSocket yet
+ // This ensures that default cluster gets TLS from the top-level
TrafficPolicy in DestinationRule
+ // When SubsetRule sets ISTIO_MUTUAL, inbound listener enforces STRICT
mTLS, so outbound must also use TLS
+ // NOTE: We re-check hasTLS here because firstDestinationRule might
have returned a different rule
+ // than the one checked in build(), especially when multiple
SubsetRules exist and merge failed
+ if defaultCluster != nil && defaultCluster.TransportSocket == nil {
+ // Re-check TLS in case DestinationRule was found here but not
in build()
+ recheckTLS := dr != nil && dr.TrafficPolicy != nil &&
dr.TrafficPolicy.Tls != nil
+ if recheckTLS {
+ tlsMode := dr.TrafficPolicy.Tls.Mode
+ tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
+ recheckTLS = (tlsMode ==
networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
+ }
+ if hasTLS || recheckTLS {
+ log.Infof("applyDestinationRule: applying TLS to
default cluster %s (DestinationRule has ISTIO_MUTUAL)", b.defaultClusterName)
+ b.applyTLSForCluster(defaultCluster, nil)
+ } else {
+ log.Debugf("applyDestinationRule: skipping TLS for
default cluster %s (DestinationRule has no TrafficPolicy or TLS)",
b.defaultClusterName)
+ }
+ } else if defaultCluster == nil && hasTLS {
+ // CRITICAL: If default cluster was not generated in build()
but DestinationRule has TLS,
+ // we need to generate it here to ensure TLS is applied
+ // This can happen if build() checked the first rule (without
TLS) but applyDestinationRule
+ // found a different rule (with TLS) via firstDestinationRule's
improved logic
+ log.Warnf("applyDestinationRule: default cluster was not
generated in build() but DestinationRule has TLS, generating it now")
+ defaultCluster = b.edsCluster(b.defaultClusterName)
+ if defaultCluster.CommonLbConfig == nil {
+ defaultCluster.CommonLbConfig =
&cluster.Cluster_CommonLbConfig{}
+ }
+ defaultCluster.CommonLbConfig.OverrideHostStatus =
&core.HealthStatusSet{
+ Statuses: []core.HealthStatus{
+ core.HealthStatus_HEALTHY,
+ core.HealthStatus_UNHEALTHY,
+ core.HealthStatus_DRAINING,
+ core.HealthStatus_UNKNOWN,
+ core.HealthStatus_DEGRADED,
+ },
+ }
+ log.Infof("applyDestinationRule: applying TLS to newly
generated default cluster %s (DestinationRule has ISTIO_MUTUAL)",
b.defaultClusterName)
+ b.applyTLSForCluster(defaultCluster, nil)
+ return nil, defaultCluster // Return the newly generated
default cluster
+ }
var commonLbConfig *cluster.Cluster_CommonLbConfig
if defaultCluster != nil {
@@ -235,9 +329,112 @@ func (b *clusterBuilder)
applyDestinationRule(defaultCluster *cluster.Cluster) (
log.Infof("applyDestinationRule: generating subset cluster %s
for subset %s", clusterName, subset.Name)
subsetCluster := b.edsCluster(clusterName)
subsetCluster.CommonLbConfig = commonLbConfig
+ b.applyTLSForCluster(subsetCluster, subset)
subsetClusters = append(subsetClusters, subsetCluster)
}
log.Infof("applyDestinationRule: generated %d subset clusters for
%s/%s", len(subsetClusters), b.svc.Attributes.Namespace, b.hostname)
- return subsetClusters
+ return subsetClusters, nil
+}
+
+// applyTLSForCluster attaches a gRPC-compatible TLS transport socket whenever
the
+// DestinationRule (or subset override) specifies ISTIO_MUTUAL/DUBBO_MUTUAL
mode.
+func (b *clusterBuilder) applyTLSForCluster(c *cluster.Cluster, subset
*networking.Subset) {
+ if c == nil || b.svc == nil {
+ return
+ }
+
+ dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace,
b.hostname)
+ if dr == nil && b.svc.Hostname != b.hostname {
+ // If not found with b.hostname, try with the service's FQDN
hostname
+ dr =
b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
+ }
+ if dr == nil {
+ log.Warnf("applyTLSForCluster: no DestinationRule found for
cluster %s (namespace=%s, hostname=%s, service hostname=%s)",
+ c.Name, b.svc.Attributes.Namespace, b.hostname,
b.svc.Hostname)
+ return
+ }
+
+ var policy *networking.TrafficPolicy
+ if subset != nil && subset.TrafficPolicy != nil {
+ policy = subset.TrafficPolicy
+ log.Infof("applyTLSForCluster: using TrafficPolicy from subset
%s for cluster %s", subset.Name, c.Name)
+ } else {
+ policy = dr.TrafficPolicy
+ if policy != nil {
+ log.Infof("applyTLSForCluster: using top-level
TrafficPolicy for cluster %s", c.Name)
+ }
+ }
+
+ if policy == nil || policy.Tls == nil {
+ if policy == nil {
+ log.Warnf("applyTLSForCluster: no TrafficPolicy found
in DestinationRule for cluster %s", c.Name)
+ } else {
+ log.Warnf("applyTLSForCluster: no TLS settings in
TrafficPolicy for cluster %s", c.Name)
+ }
+ return
+ }
+
+ mode := policy.Tls.Mode
+ modeStr := policy.Tls.Mode.String()
+ if mode != networking.ClientTLSSettings_ISTIO_MUTUAL && modeStr !=
"DUBBO_MUTUAL" {
+ log.Debugf("applyTLSForCluster: TLS mode %v (%s) not supported
for gRPC proxyless, skipping", mode, modeStr)
+ return
+ }
+
+ tlsContext := b.buildUpstreamTLSContext(c, policy.Tls)
+ if tlsContext == nil {
+ log.Warnf("applyTLSForCluster: failed to build TLS context for
cluster %s", c.Name)
+ return
+ }
+
+ // Log SNI configuration for debugging
+ sni := tlsContext.Sni
+ if sni == "" {
+ log.Warnf("applyTLSForCluster: SNI is empty for cluster %s,
this may cause TLS handshake failures", c.Name)
+ } else {
+ log.Debugf("applyTLSForCluster: using SNI=%s for cluster %s",
sni, c.Name)
+ }
+
+ c.TransportSocket = &core.TransportSocket{
+ Name: "envoy.transport_sockets.tls",
+ ConfigType: &core.TransportSocket_TypedConfig{TypedConfig:
protoconv.MessageToAny(tlsContext)},
+ }
+ log.Infof("applyTLSForCluster: applied %v TLS transport socket to
cluster %s (SNI=%s)", mode, c.Name, sni)
+}
+
+// buildUpstreamTLSContext builds an UpstreamTlsContext that conforms to gRPC
xDS expectations,
+// reusing the common certificate-provider setup from buildCommonTLSContext.
+func (b *clusterBuilder) buildUpstreamTLSContext(c *cluster.Cluster,
tlsSettings *networking.ClientTLSSettings) *tlsv3.UpstreamTlsContext {
+ common := buildCommonTLSContext()
+ if common == nil {
+ return nil
+ }
+
+ tlsContext := &tlsv3.UpstreamTlsContext{
+ CommonTlsContext: common,
+ Sni: tlsSettings.GetSni(),
+ }
+ // CRITICAL: SNI must be the service hostname, not the cluster name
+ // Cluster name format: outbound|port|subset|hostname
+ // We need to extract the hostname from the cluster name or use the
service hostname
+ if tlsContext.Sni == "" {
+ if b.svc != nil && b.svc.Hostname != "" {
+ tlsContext.Sni = string(b.svc.Hostname)
+ } else {
+ // Fallback: try to extract hostname from cluster name
+ // Cluster name format: outbound|port|subset|hostname
+ parts := strings.Split(c.Name, "|")
+ if len(parts) >= 4 {
+ tlsContext.Sni = parts[3]
+ } else {
+ // Last resort: use cluster name (not ideal but
better than empty)
+ tlsContext.Sni = c.Name
+ log.Warnf("buildUpstreamTLSContext: using
cluster name as SNI fallback for %s (should be service hostname)", c.Name)
+ }
+ }
+ }
+ // Proxyless gRPC always speaks HTTP/2, advertise h2 via ALPN.
+ tlsContext.CommonTlsContext.AlpnProtocols = []string{"h2"}
+ return tlsContext
}
diff --git a/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
b/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
index c4544aa0..f09cd3d4 100644
--- a/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
+++ b/dubbod/planet/pkg/networking/grpcgen/grpcgen.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
+ tlsv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
)
var log = dubbolog.RegisterScope("grpcgen", "xDS Generator for Proxyless gRPC")
@@ -51,3 +52,35 @@ func (g *GrpcConfigGenerator) Generate(proxy *model.Proxy, w
*model.WatchedResou
return nil, model.DefaultXdsLogDetails, nil
}
+
+// buildCommonTLSContext creates a TLS context that matches gRPC xDS
expectations.
+// It is adapted from Istio's buildCommonTLSContext implementation, but kept
minimal:
+// - Uses certificate provider "default" for workload certs and root CA
+// - Does not configure explicit SAN matches (left to future hardening)
+func buildCommonTLSContext() *tlsv3.CommonTlsContext {
+ return &tlsv3.CommonTlsContext{
+ // Workload certificate provider instance (SPIFFE workload cert
chain)
+ TlsCertificateCertificateProviderInstance:
&tlsv3.CommonTlsContext_CertificateProviderInstance{
+ InstanceName: "default",
+ CertificateName: "default",
+ },
+ // Root CA provider instance
+ ValidationContextType:
&tlsv3.CommonTlsContext_CombinedValidationContext{
+ CombinedValidationContext:
&tlsv3.CommonTlsContext_CombinedCertificateValidationContext{
+ ValidationContextCertificateProviderInstance:
&tlsv3.CommonTlsContext_CertificateProviderInstance{
+ InstanceName: "default",
+ CertificateName: "ROOTCA",
+ },
+ // DefaultValidationContext: Configure basic
certificate validation
+ // The certificate provider instance (ROOTCA)
provides the root CA for validation
+ // For gRPC proxyless, we rely on the
certificate provider for root CA validation
+ // SAN matching can be added later if needed
for stricter validation
+ DefaultValidationContext:
&tlsv3.CertificateValidationContext{
+ // Trust the root CA from the
certificate provider
+ // The certificate provider instance
"default" with "ROOTCA" will provide
+ // the root CA certificates for
validating peer certificates
+ },
+ },
+ },
+ }
+}
diff --git a/dubbod/planet/pkg/networking/grpcgen/lds.go
b/dubbod/planet/pkg/networking/grpcgen/lds.go
index bd98e07f..015681ec 100644
--- a/dubbod/planet/pkg/networking/grpcgen/lds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/lds.go
@@ -35,7 +35,9 @@ import (
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
routerv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
hcmv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
+ tlsv3
"github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
)
type listenerNames map[string]listenerName
@@ -197,6 +199,16 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
continue
}
+ // According to Istio's proxyless gRPC implementation:
+ // - DestinationRule with ISTIO_MUTUAL only configures
CLIENT-SIDE (outbound) mTLS
+ // - PeerAuthentication with STRICT configures SERVER-SIDE
(inbound) mTLS
+ // Both are REQUIRED for mTLS to work. Server-side mTLS should
ONLY be controlled by PeerAuthentication.
+ // Reference:
https://istio.io/latest/blog/2021/proxyless-grpc/#enabling-mtls
+ mode := push.InboundMTLSModeForProxy(node, uint32(listenPort))
+ if mode == model.MTLSPermissive {
+ log.Warnf("buildInboundListeners: PERMISSIVE mTLS is
not supported for proxyless gRPC; defaulting to plaintext on listener %s", name)
+ }
+
// For proxyless gRPC inbound listeners, we need a FilterChain
with HttpConnectionManager filter
// to satisfy gRPC client requirements. According to grpc-go
issue #7691 and the error
// "missing HttpConnectionManager filter", gRPC proxyless
clients require HttpConnectionManager
@@ -238,6 +250,32 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
},
}
+ filterChain := &listener.FilterChain{
+ Filters: []*listener.Filter{
+ {
+ Name: wellknown.HTTPConnectionManager,
+ ConfigType:
&listener.Filter_TypedConfig{
+ TypedConfig:
protoconv.MessageToAny(hcm),
+ },
+ },
+ },
+ }
+
+ if ts := buildDownstreamTransportSocket(mode); ts != nil {
+ // For STRICT mTLS mode, set TransportSocket to require
TLS connections
+ // When TransportSocket is present, only TLS
connections can match this FilterChain
+ // No FilterChainMatch needed - gRPC proxyless will
automatically match based on TransportSocket presence
+ filterChain.TransportSocket = ts
+ log.Infof("buildInboundListeners: applied STRICT mTLS
transport socket to listener %s (mode=%v, requires client cert=true)", name,
mode)
+ } else if mode == model.MTLSStrict {
+ log.Warnf("buildInboundListeners: expected to enable
STRICT mTLS on listener %s but failed to build transport socket (mode=%v)",
name, mode)
+ } else {
+ // For plaintext mode, no TransportSocket means only
plaintext connections can match
+ // No FilterChainMatch needed - gRPC proxyless will
automatically match based on TransportSocket absence
+ // TLS connections will fail to match this FilterChain
(no TransportSocket) and connection will fail
+ log.Debugf("buildInboundListeners: listener %s using
plaintext (mode=%v) - clients using TLS will fail to connect", name, mode)
+ }
+
ll := &listener.Listener{
Name: name,
Address: &core.Address{Address:
&core.Address_SocketAddress{
@@ -249,18 +287,7 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
},
}},
// Create FilterChain with HttpConnectionManager filter
for proxyless gRPC
- FilterChains: []*listener.FilterChain{
- {
- Filters: []*listener.Filter{
- {
- Name:
wellknown.HTTPConnectionManager,
- ConfigType:
&listener.Filter_TypedConfig{
- TypedConfig:
protoconv.MessageToAny(hcm),
- },
- },
- },
- },
- },
+ FilterChains: []*listener.FilterChain{filterChain},
// the following must not be set or the client will NACK
ListenerFilters: nil,
UseOriginalDst: nil,
@@ -279,6 +306,31 @@ func buildInboundListeners(node *model.Proxy, push
*model.PushContext, names []s
return out
}
+func buildDownstreamTransportSocket(mode model.MutualTLSMode)
*core.TransportSocket {
+ if mode != model.MTLSStrict {
+ return nil
+ }
+ common := buildCommonTLSContext()
+ if common == nil {
+ return nil
+ }
+ common.AlpnProtocols = []string{"h2"}
+
+ // For STRICT mTLS, we require client certificates and validate them
+ // The validation context is already configured in buildCommonTLSContext
+ // via the certificate provider instance (ROOTCA)
+ tlsContext := &tlsv3.DownstreamTlsContext{
+ CommonTlsContext: common,
+ RequireClientCertificate: wrapperspb.Bool(true),
+ // Note: gRPC proxyless uses certificate provider for validation
+ // The ValidationContextType in CommonTlsContext handles client
cert validation
+ }
+ return &core.TransportSocket{
+ Name: "envoy.transport_sockets.tls",
+ ConfigType: &core.TransportSocket_TypedConfig{TypedConfig:
protoconv.MessageToAny(tlsContext)},
+ }
+}
+
func buildOutboundListeners(node *model.Proxy, push *model.PushContext, filter
listenerNames) model.Resources {
out := make(model.Resources, 0, len(filter))
diff --git a/dubbod/planet/pkg/xds/cds.go b/dubbod/planet/pkg/xds/cds.go
index 621e1e21..1cc35be7 100644
--- a/dubbod/planet/pkg/xds/cds.go
+++ b/dubbod/planet/pkg/xds/cds.go
@@ -20,6 +20,7 @@ package xds
import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/core"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
)
type CdsGenerator struct {
@@ -33,6 +34,17 @@ func cdsNeedsPush(req *model.PushRequest, proxy
*model.Proxy) (*model.PushReques
return req, res
}
+ // CRITICAL: According to Istio proxyless gRPC behavior, when
SubsetRule (DestinationRule) is created/updated
+ // with TLS configuration (ISTIO_MUTUAL), CDS must be pushed to update
cluster TransportSocket.
+ // Even if req.Full is false, we need to check if SubsetRule was
updated, as it affects cluster TLS config.
+ if req != nil && req.ConfigsUpdated != nil {
+ // Check if SubsetRule was updated - this requires CDS push to
update cluster TransportSocket
+ if model.HasConfigsOfKind(req.ConfigsUpdated, kind.SubsetRule) {
+ log.Debugf("cdsNeedsPush: SubsetRule updated, CDS push
required to update cluster TLS config")
+ return req, true
+ }
+ }
+
if !req.Full {
return req, false
}
diff --git a/manifests/charts/base/files/crd-all.yaml
b/manifests/charts/base/files/crd-all.yaml
index 9f82a97e..96333392 100644
--- a/manifests/charts/base/files/crd-all.yaml
+++ b/manifests/charts/base/files/crd-all.yaml
@@ -90,12 +90,12 @@ spec:
description: |-
Indicates whether connections to this port
should be secured using TLS.
- Valid Options: DISABLE, SIMPLE, MUTUAL,
DUBBO_MUTUAL
+ Valid Options: DISABLE, SIMPLE, MUTUAL,
ISTIO_MUTUAL
enum:
- DISABLE
- SIMPLE
- MUTUAL
- - DUBBO_MUTUAL
+ - ISTIO_MUTUAL
type: string
type: object
type: object
@@ -114,12 +114,12 @@ spec:
description: |-
Indicates whether connections to this port should
be secured using TLS.
- Valid Options: DISABLE, SIMPLE, MUTUAL,
DUBBO_MUTUAL
+ Valid Options: DISABLE, SIMPLE, MUTUAL,
ISTIO_MUTUAL
enum:
- DISABLE
- SIMPLE
- MUTUAL
- - DUBBO_MUTUAL
+ - ISTIO_MUTUAL
type: string
type: object
type: object
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
index bc0264d7..b8e63967 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
@@ -17,6 +17,11 @@ metadata:
annotations:
dubbo.apache.org/rev: default
spec:
+ # NOTE:
+ # Application containers that rely on this template must mount the same
"dubbo-data"
+ # volume at /var/lib/dubbo/data and set
GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true
+ # in order for proxyless gRPC security (mTLS) to pick up the certificates
generated
+ # by the dubbo-proxy sidecar.
containers:
- name: dubbo-proxy
image: "{{ $.ProxyImage }}"
@@ -57,6 +62,10 @@ spec:
value: dubbod
- name: DUBBO_META_CLUSTER_ID
value: Kubernetes
+ - name: DUBBO_META_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
- name: CA_ADDR
value: dubbod.dubbo-system.svc:15012
- name: PROXY_CONFIG
diff --git a/pkg/dubbo-agent/agent.go b/pkg/dubbo-agent/agent.go
index 8e560d84..1fb56210 100644
--- a/pkg/dubbo-agent/agent.go
+++ b/pkg/dubbo-agent/agent.go
@@ -35,6 +35,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/nodeagent/cache"
+ "github.com/apache/dubbo-kubernetes/pkg/backoff"
"github.com/apache/dubbo-kubernetes/pkg/bootstrap"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
@@ -142,7 +143,7 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
}
log.Info("Starting default Dubbo SDS Server")
- err = a.initSdsServer()
+ err = a.initSdsServer(ctx)
if err != nil {
return nil, fmt.Errorf("failed to start default Dubbo SDS
server: %v", err)
}
@@ -219,19 +220,6 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
}
}()
- // Wait for certificate generation to complete before establishing
preemptive connection.
- // This ensures certificate logs appear after "Opening status port" but
before "Establishing preemptive connection".
- // The SDS service generates certificates asynchronously in
newSDSService.
- // We wait a short time for the SDS service's async generation to
complete and log.
- // This avoids calling GenerateSecret ourselves, which would cause
duplicate generation and logs.
- if a.secretCache != nil && !a.secOpts.FileMountedCerts &&
!a.secOpts.ServeOnlyFiles {
- // Give SDS service's async certificate generation goroutine a
chance to complete.
- // The SDS service starts generating certificates immediately
after initSdsServer,
- // so a short wait is usually sufficient for the certificates
to be generated and logged.
- // This avoids the need to call GenerateSecret ourselves, which
would cause duplicate logs.
- time.Sleep(300 * time.Millisecond)
- }
-
// Now set bootstrap node to trigger preemptive connection.
// This ensures preemptive connection logs appear after certificate
logs.
if bootstrapNode != nil && a.xdsProxy != nil {
@@ -383,7 +371,7 @@ func (a *Agent) startFileWatcher(ctx context.Context,
filePath string, handler f
}
}
-func (a *Agent) initSdsServer() error {
+func (a *Agent) initSdsServer(ctx context.Context) error {
var err error
if
security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath,
security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
log.Info("workload certificate files detected, creating secret
manager without caClient")
@@ -401,9 +389,7 @@ func (a *Agent) initSdsServer() error {
pkpConf := a.proxyConfig.GetPrivateKeyProvider()
a.sdsServer = a.cfg.SDSFactory(a.secOpts, a.secretCache, pkpConf)
- a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
-
- return nil
+ return a.registerSecretHandler(ctx)
}
func (a *Agent) rebuildSDSWithNewCAClient() {
@@ -426,8 +412,55 @@ func (a *Agent) rebuildSDSWithNewCAClient() {
a.secretCache = sc
pkpConf := a.proxyConfig.GetPrivateKeyProvider()
a.sdsServer = a.cfg.SDSFactory(a.secOpts, a.secretCache, pkpConf)
- a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
- log.Info("SDS server and CA client rebuilt successfully")
+ if err := a.registerSecretHandler(context.Background()); err != nil {
+ log.Errorf("failed to refresh workload certificates after CA
rebuild: %v", err)
+ } else {
+ log.Info("SDS server and CA client rebuilt successfully")
+ }
+}
+
+func (a *Agent) registerSecretHandler(ctx context.Context) error {
+ if a.secretCache == nil {
+ return nil
+ }
+ handler := func(resourceName string) {
+ if a.sdsServer != nil {
+ a.sdsServer.OnSecretUpdate(resourceName)
+ }
+ if resourceName == security.WorkloadKeyCertResourceName ||
resourceName == security.RootCertReqResourceName {
+ go func() {
+ if err :=
a.ensureWorkloadCertificates(context.Background()); err != nil {
+ log.Warnf("failed to refresh workload
certificates after %s update: %v", resourceName, err)
+ }
+ }()
+ }
+ }
+ a.secretCache.RegisterSecretHandler(handler)
+ return a.ensureWorkloadCertificates(ctx)
+}
+
+func (a *Agent) ensureWorkloadCertificates(ctx context.Context) error {
+ if a.secretCache == nil || a.secOpts == nil ||
a.secOpts.OutputKeyCertToDir == "" {
+ // Nothing to write
+ return nil
+ }
+ generate := func(resource string) error {
+ b := backoff.NewExponentialBackOff(backoff.DefaultOption())
+ return b.RetryWithContext(ctx, func() error {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ _, err := a.secretCache.GenerateSecret(resource)
+ if err != nil {
+ log.Warnf("failed to generate %s: %v",
resource, err)
+ }
+ return err
+ })
+ }
+ if err := generate(security.WorkloadKeyCertResourceName); err != nil {
+ return err
+ }
+ return generate(security.RootCertReqResourceName)
}
func (a *Agent) generateGRPCBootstrapWithNode() (*model.Node, error) {
diff --git a/samples/grpc-app/README.md b/samples/grpc-app/README.md
index d28d0b0c..15629269 100644
--- a/samples/grpc-app/README.md
+++ b/samples/grpc-app/README.md
@@ -5,8 +5,8 @@ This example demonstrates how to deploy gRPC applications with
proxyless service
## Overview
This sample includes:
-- **Consumer**: A gRPC server that receives requests (port 17070)
-- **Producer**: A gRPC client that sends requests to the consumer service
+- **Producer**: A gRPC server that receives requests (port 17070) and is
deployed with multiple versions (v1/v2) to showcase gray release scenarios.
+- **Consumer**: A gRPC client that sends requests to the producer service and
exposes a test server (port 17171) for driving traffic via `grpcurl`.
Both services use native gRPC xDS clients to connect to the Dubbo control
plane through the `dubbo-proxy` sidecar, enabling service discovery, load
balancing, and traffic management without requiring Envoy proxy for application
traffic.
@@ -49,22 +49,35 @@ kubectl get svc -n grpc-app
- `inject.dubbo.apache.org/templates: grpc-agent` - Uses the grpc-agent
template
- `proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": true}'`
- Ensures proxy starts before application
-### Environment Variables
+### Security requirements
-The `dubbo-proxy` sidecar automatically:
-- Creates `/etc/dubbo/proxy/grpc-bootstrap.json` bootstrap file
-- Exposes xDS proxy via Unix Domain Socket
-- Sets `GRPC_XDS_BOOTSTRAP` environment variable for application containers
+When mTLS is enabled (`SubsetRule` with `ISTIO_MUTUAL` or `PeerAuthentication
STRICT`), **both the producer and consumer application containers must**:
+
+1. Mount the certificate output directory that `dubbo-proxy` writes to:
+ ```yaml
+ volumeMounts:
+ - name: dubbo-data
+ mountPath: /var/lib/dubbo/data
+ ```
+ The `grpc-agent` template already provides the `dubbo-data` volume;
mounting it from the app container makes the generated `cert-chain.pem`,
`key.pem`, and `root-cert.pem` visible to gRPC.
+
+2. Set `GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true` so the gRPC runtime
actually consumes the xDS security config instead of falling back to plaintext.
The sample manifest in `grpc-app.yaml` shows the required environment variable.
+
+3. When testing with `grpcurl`, export the same variable before issuing TLS
requests:
+ ```bash
+ export GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true
+ grpcurl -d
'{"url":"xds:///producer.grpc-app.svc.cluster.local:7070","count":5}'
localhost:17171 echo.EchoTestService/ForwardEcho
+ ```
## Testing
### Test with grpcurl
-1. Port forward to producer service:
+1. Port forward to the consumer test server:
```bash
kubectl port-forward -n grpc-app \
- $(kubectl get pod -l app=producer -n grpc-app -o
jsonpath='{.items[0].metadata.name}') \
+ $(kubectl get pod -l app=consumer -n grpc-app -o
jsonpath='{.items[0].metadata.name}') \
17171:17171 &
```
@@ -72,7 +85,7 @@ kubectl port-forward -n grpc-app \
```bash
grpcurl -plaintext -d '{
- "url": "xds:///consumer.grpc-app.svc.cluster.local:7070",
+ "url": "xds:///producer.grpc-app.svc.cluster.local:7070",
"count": 5
}' localhost:17171 echo.EchoTestService/ForwardEcho
```
@@ -81,11 +94,11 @@ Expected output:
```json
{
"output": [
- "[0 body] Hostname=consumer-xxx",
- "[1 body] Hostname=consumer-yyy",
- "[2 body] Hostname=consumer-xxx",
- "[3 body] Hostname=consumer-yyy",
- "[4 body] Hostname=consumer-xxx"
+ "[0 body] Hostname=producer-xxx",
+ "[1 body] Hostname=producer-yyy",
+ "[2 body] Hostname=producer-xxx",
+ "[3 body] Hostname=producer-yyy",
+ "[4 body] Hostname=producer-xxx"
]
}
```
@@ -93,14 +106,14 @@ Expected output:
### Check Logs
```bash
-# Consumer logs
-kubectl logs -f -l app=consumer -n grpc-app -c app
-
# Producer logs
kubectl logs -f -l app=producer -n grpc-app -c app
+# Consumer logs
+kubectl logs -f -l app=consumer -n grpc-app -c app
+
# Proxy sidecar logs
-kubectl logs -f -l app=consumer -n grpc-app -c dubbo-proxy
+kubectl logs -f -l app=producer -n grpc-app -c dubbo-proxy
```
## Troubleshooting
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index cb3bef55..6877235d 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -17,12 +17,12 @@ apiVersion: v1
kind: Service
metadata:
labels:
- app: consumer
- name: consumer
+ app: producer
+ name: producer
namespace: grpc-app
spec:
selector:
- app: consumer
+ app: producer
type: ClusterIP
ports:
- name: grpc
@@ -32,13 +32,13 @@ spec:
apiVersion: apps/v1
kind: Deployment
metadata:
- name: consumer-v1
+ name: producer-v1
namespace: grpc-app
spec:
- replicas: 2
+ replicas: 1
selector:
matchLabels:
- app: consumer
+ app: producer
version: v1
template:
metadata:
@@ -47,18 +47,20 @@ spec:
inject.dubbo.apache.org/templates: grpc-agent
proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts":
true}'
labels:
- app: consumer
+ app: producer
version: v1
spec:
containers:
- name: app
- image: mfordjody/grpc-consumer:dev-debug
+ image: mfordjody/grpc-producer:dev-debug
imagePullPolicy: Always
ports:
- containerPort: 17070
protocol: TCP
name: grpc
env:
+ - name: GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT
+ value: "true"
- name: INSTANCE_IP
valueFrom:
fieldRef:
@@ -74,6 +76,9 @@ spec:
fieldPath: metadata.namespace
- name: SERVICE_PORT
value: "17070"
+ volumeMounts:
+ - mountPath: /var/lib/dubbo/data
+ name: dubbo-data
readinessProbe:
tcpSocket:
port: 17070
@@ -90,17 +95,20 @@ spec:
timeoutSeconds: 2
successThreshold: 1
failureThreshold: 3
+ volumes:
+ - name: dubbo-data
+ emptyDir: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
- name: consumer-v2
+ name: producer-v2
namespace: grpc-app
spec:
- replicas: 2
+ replicas: 1
selector:
matchLabels:
- app: consumer
+ app: producer
version: v2
template:
metadata:
@@ -109,18 +117,20 @@ spec:
inject.dubbo.apache.org/templates: grpc-agent
proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts":
true}'
labels:
- app: consumer
+ app: producer
version: v2
spec:
containers:
- name: app
- image: mfordjody/grpc-consumer:dev-debug
+ image: mfordjody/grpc-producer:dev-debug
imagePullPolicy: Always
ports:
- containerPort: 17070
protocol: TCP
name: grpc
env:
+ - name: GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT
+ value: "true"
- name: INSTANCE_IP
valueFrom:
fieldRef:
@@ -136,6 +146,9 @@ spec:
fieldPath: metadata.namespace
- name: SERVICE_PORT
value: "17070"
+ volumeMounts:
+ - mountPath: /var/lib/dubbo/data
+ name: dubbo-data
readinessProbe:
tcpSocket:
port: 17070
@@ -152,17 +165,20 @@ spec:
timeoutSeconds: 2
successThreshold: 1
failureThreshold: 3
+ volumes:
+ - name: dubbo-data
+ emptyDir: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
- name: producer
+ name: consumer
namespace: grpc-app
spec:
replicas: 1
selector:
matchLabels:
- app: producer
+ app: consumer
template:
metadata:
annotations:
@@ -170,23 +186,28 @@ spec:
inject.dubbo.apache.org/templates: grpc-agent
proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts":
true}'
labels:
- app: producer
+ app: consumer
spec:
containers:
- name: app
- # Replace with your own image containing the producer binary
- image: mfordjody/grpc-producer:dev-debug
+ # Replace with your own image containing the consumer binary
+ image: mfordjody/grpc-consumer:dev-debug
imagePullPolicy: Always
- # Optional: uncomment to test direct connection
- # - --target=xds:///consumer.grpc-app.svc.cluster.local:7070
- # - --count=10
ports:
- containerPort: 17171
protocol: TCP
name: grpc-test
env:
+ - name: GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT
+ value: "true"
- name: INSTANCE_IP
valueFrom:
fieldRef:
apiVersion: v1
- fieldPath: status.podIP
\ No newline at end of file
+ fieldPath: status.podIP
+ volumeMounts:
+ - mountPath: /var/lib/dubbo/data
+ name: dubbo-data
+ volumes:
+ - name: dubbo-data
+ emptyDir: {}
\ No newline at end of file
diff --git a/tests/grpc-app/README.md b/tests/grpc-app/README.md
index adcff343..f8f9098f 100644
--- a/tests/grpc-app/README.md
+++ b/tests/grpc-app/README.md
@@ -4,8 +4,8 @@ This is a test example for gRPC proxyless service mesh based on
[Istio's blog po
## Architecture
-- **Consumer**: gRPC server with xDS support (port 17070)
-- **Producer**: gRPC client with xDS support + test server (port 17171)
+- **Producer**: gRPC server with xDS support (port 17070). This service is
deployed with multiple versions (v1/v2) to demonstrate gray
release/traffic-splitting scenarios and exposes gRPC reflection so `grpcurl`
can query it directly.
+- **Consumer**: gRPC client with xDS support + test server (port 17171). This
component drives load toward the producer service for automated tests.
Both services use `dubbo-proxy` sidecar as an xDS proxy to connect to the
control plane. The sidecar runs an xDS proxy server that listens on a Unix
Domain Socket (UDS) at `/etc/dubbo/proxy/XDS`. The gRPC applications connect to
this xDS proxy via the UDS socket using the `GRPC_XDS_BOOTSTRAP` environment
variable.
diff --git a/tests/grpc-app/consumer/main.go b/tests/grpc-app/consumer/main.go
index 89da67e9..a9916f3a 100644
--- a/tests/grpc-app/consumer/main.go
+++ b/tests/grpc-app/consumer/main.go
@@ -19,6 +19,7 @@ package main
import (
"context"
+ "encoding/json"
"flag"
"fmt"
"log"
@@ -26,105 +27,28 @@ import (
"os"
"os/signal"
"regexp"
- "strconv"
"strings"
+ "sync"
"syscall"
"time"
"google.golang.org/grpc"
+ "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/reflection"
- "google.golang.org/grpc/xds"
+ "google.golang.org/grpc/status"
+ _ "google.golang.org/grpc/xds"
pb "github.com/apache/dubbo-kubernetes/tests/grpc-app/proto"
)
var (
- port = flag.Int("port", 17070, "gRPC server port")
+ port = flag.Int("port", 17171, "gRPC server port for ForwardEcho
testing")
+ testServer *grpc.Server
)
-type echoServer struct {
- pb.UnimplementedEchoServiceServer
- pb.UnimplementedEchoTestServiceServer
- hostname string
- serviceVersion string
- namespace string
- instanceIP string
- cluster string
- servicePort int
-}
-
-func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest)
(*pb.EchoResponse, error) {
- if req == nil {
- return nil, fmt.Errorf("request is nil")
- }
- log.Printf("Received: %v", req.Message)
- return &pb.EchoResponse{
- Message: req.Message,
- Hostname: s.hostname,
- ServiceVersion: s.serviceVersion,
- Namespace: s.namespace,
- Ip: s.instanceIP,
- Cluster: s.cluster,
- ServicePort: int32(s.servicePort),
- }, nil
-}
-
-func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream
pb.EchoService_StreamEchoServer) error {
- if req == nil {
- return fmt.Errorf("request is nil")
- }
- if stream == nil {
- return fmt.Errorf("stream is nil")
- }
- log.Printf("StreamEcho received: %v", req.Message)
- for i := 0; i < 3; i++ {
- if err := stream.Send(&pb.EchoResponse{
- Message: fmt.Sprintf("%s [%d]", req.Message, i),
- Hostname: s.hostname,
- }); err != nil {
- log.Printf("StreamEcho send error: %v", err)
- return err
- }
- }
- return nil
-}
-
-func (s *echoServer) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
- if req == nil {
- return nil, fmt.Errorf("request is nil")
- }
-
- count := req.Count
- if count < 0 {
- count = 0
- }
- if count > 100 {
- count = 100
- }
-
- log.Printf("ForwardEcho called: url=%s, count=%d", req.Url, count)
-
- output := make([]string, 0, count)
- for i := int32(0); i < count; i++ {
- line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s
ServicePort=%d Namespace=%s",
- i, s.hostname, s.serviceVersion, s.servicePort,
s.namespace)
- if s.instanceIP != "" {
- line += fmt.Sprintf(" IP=%s", s.instanceIP)
- }
- if s.cluster != "" {
- line += fmt.Sprintf(" Cluster=%s", s.cluster)
- }
- output = append(output, line)
- }
-
- return &pb.ForwardEchoResponse{
- Output: output,
- }, nil
-}
-
// grpcLogger filters out xDS informational logs that are incorrectly marked
as ERROR
type grpcLogger struct {
logger *log.Logger
@@ -158,7 +82,6 @@ func cleanMessage(msg string) string {
func (l *grpcLogger) Info(args ...interface{}) {
msg := fmt.Sprint(args...)
- // Filter out xDS "entering mode: SERVING" logs
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
@@ -201,18 +124,9 @@ func (l *grpcLogger) Warningf(format string, args
...interface{}) {
func (l *grpcLogger) Error(args ...interface{}) {
msg := fmt.Sprint(args...)
- // Filter out xDS "entering mode: SERVING" logs that are incorrectly
marked as ERROR
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
- // Filter out common connection reset errors - these are normal network
behavior
- // when clients disconnect before completing the HTTP/2 handshake
- if strings.Contains(msg, "connection reset by peer") ||
- strings.Contains(msg, "failed to receive the preface from
client") ||
- strings.Contains(msg, "connection error") {
- // These are normal network events, log at DEBUG level instead
of ERROR
- return
- }
msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -222,12 +136,6 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
- // Filter out common connection reset errors - these are normal network
behavior
- if strings.Contains(msg, "connection reset by peer") ||
- strings.Contains(msg, "failed to receive the preface from
client") ||
- strings.Contains(msg, "connection error") {
- return
- }
msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -237,12 +145,6 @@ func (l *grpcLogger) Errorf(format string, args
...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
- // Filter out common connection reset errors - these are normal network
behavior
- if strings.Contains(msg, "connection reset by peer") ||
- strings.Contains(msg, "failed to receive the preface from
client") ||
- strings.Contains(msg, "connection error") {
- return
- }
msg = cleanMessage(msg)
l.logger.Printf("ERROR: %s", msg)
}
@@ -263,139 +165,578 @@ func (l *grpcLogger) V(level int) bool {
return level <= 0
}
-// waitForBootstrapFile waits for the grpc-bootstrap.json file to exist
-// This is necessary because the dubbo-proxy sidecar needs time to generate
the file
-func waitForBootstrapFile(bootstrapPath string, maxWait time.Duration) error {
- log.Printf("Waiting for bootstrap file to exist: %s (max wait: %v)",
bootstrapPath, maxWait)
+func main() {
+ flag.Parse()
- ctx, cancel := context.WithTimeout(context.Background(), maxWait)
- defer cancel()
+ // Set custom gRPC logger to filter out xDS informational logs
+ // The "ERROR: [xds] Listener entering mode: SERVING" is actually an
informational log
+ grpclog.SetLoggerV2(&grpcLogger{
+ logger: log.New(os.Stderr, "", log.LstdFlags),
+ })
- ticker := time.NewTicker(500 * time.Millisecond)
- defer ticker.Stop()
+ go startTestServer(*port)
- startTime := time.Now()
- for {
- // Check if file exists and is not empty
- if info, err := os.Stat(bootstrapPath); err == nil &&
info.Size() > 0 {
- log.Printf("Bootstrap file found after %v: %s",
time.Since(startTime), bootstrapPath)
- return nil
- }
+ log.Printf("Consumer running. Test server listening on port %d for
ForwardEcho", *port)
- // Check for timeout
- select {
- case <-ctx.Done():
- return fmt.Errorf("timeout waiting for bootstrap file:
%s (waited %v)", bootstrapPath, time.Since(startTime))
- case <-ticker.C:
- // Continue waiting
- }
- }
-}
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+ <-sigChan
+ log.Println("Shutting down...")
-func firstNonEmpty(values ...string) string {
- for _, v := range values {
- if strings.TrimSpace(v) != "" {
- return v
- }
+ if testServer != nil {
+ log.Println("Stopping test server...")
+ testServer.GracefulStop()
}
- return ""
}
-func main() {
- flag.Parse()
+func startTestServer(port int) {
+ lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
+ if err != nil {
+ log.Printf("Failed to listen on port %d: %v", port, err)
+ return
+ }
- // Set custom gRPC logger to filter out xDS informational logs
- // The "ERROR: [xds] Listener entering mode: SERVING" is actually an
informational log
- grpclog.SetLoggerV2(&grpcLogger{
- logger: log.New(os.Stderr, "", log.LstdFlags),
+ testServer = grpc.NewServer()
+ pb.RegisterEchoTestServiceServer(testServer, &testServerImpl{
+ connCache: make(map[string]*cachedConnection),
})
+ reflection.Register(testServer)
- hostname, _ := os.Hostname()
- if hostname == "" {
- hostname = "unknown"
+ log.Printf("Test server listening on port %d for ForwardEcho
(reflection enabled)", port)
+ if err := testServer.Serve(lis); err != nil {
+ log.Printf("Test server error: %v", err)
}
+}
+
+type cachedConnection struct {
+ conn *grpc.ClientConn
+ createdAt time.Time
+}
- namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"),
os.Getenv("POD_NAMESPACE"), "default")
- serviceVersion := firstNonEmpty(
- os.Getenv("SERVICE_VERSION"),
- os.Getenv("POD_VERSION"),
- os.Getenv("VERSION"),
- )
- if serviceVersion == "" {
- serviceVersion = "unknown"
+type testServerImpl struct {
+ pb.UnimplementedEchoTestServiceServer
+ // Connection cache: map from URL to cached connection
+ connCache map[string]*cachedConnection
+ connMutex sync.RWMutex
+}
+
+// formatGRPCError formats gRPC errors similar to grpcurl output
+func formatGRPCError(err error, index int32, total int32) string {
+ if err == nil {
+ return ""
}
- cluster := os.Getenv("SERVICE_CLUSTER")
- instanceIP := os.Getenv("INSTANCE_IP")
- servicePort := *port
- if sp := os.Getenv("SERVICE_PORT"); sp != "" {
- if parsed, err := strconv.Atoi(sp); err == nil {
- servicePort = parsed
+
+ // Extract gRPC status code and message using status package
+ code := "Unknown"
+ message := err.Error()
+
+ // Try to extract gRPC status
+ if st, ok := status.FromError(err); ok {
+ code = st.Code().String()
+ message = st.Message()
+ } else {
+ // Fallback: try to extract from error string
+ if strings.Contains(message, "code = ") {
+ // Extract code like "code = Unavailable"
+ codeMatch := regexp.MustCompile(`code =
(\w+)`).FindStringSubmatch(message)
+ if len(codeMatch) > 1 {
+ code = codeMatch[1]
+ }
+ // Extract message after "desc = "
+ descMatch := regexp.MustCompile(`desc =
"?([^"]+)"?`).FindStringSubmatch(message)
+ if len(descMatch) > 1 {
+ message = descMatch[1]
+ } else {
+ // If no desc, try to extract message after code
+ parts := strings.SplitN(message, "desc = ", 2)
+ if len(parts) > 1 {
+ message = strings.Trim(parts[1], `"`)
+ }
+ }
}
}
- // Get bootstrap file path from environment variable or use default
+ // Format similar to grpcurl (single line format)
+ if total == 1 {
+ return fmt.Sprintf("ERROR:\nCode: %s\nMessage: %s", code,
message)
+ }
+ return fmt.Sprintf("[%d] Error: rpc error: code = %s desc = %s", index,
code, message)
+}
+
+func (s *testServerImpl) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+ if req == nil {
+ return nil, fmt.Errorf("request is nil")
+ }
+
+ if req.Url == "" {
+ return nil, fmt.Errorf("url is required")
+ }
+
+ count := req.Count
+ if count < 0 {
+ count = 0
+ }
+ if count > 100 {
+ count = 100
+ }
+
+ log.Printf("ForwardEcho: url=%s, count=%d", req.Url, count)
+
+ // Check bootstrap configuration
bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
if bootstrapPath == "" {
- bootstrapPath = "/etc/dubbo/proxy/grpc-bootstrap.json"
- log.Printf("GRPC_XDS_BOOTSTRAP not set, using default: %s",
bootstrapPath)
+ return nil, fmt.Errorf("GRPC_XDS_BOOTSTRAP environment variable
is not set")
}
- // Wait for bootstrap file to exist before creating xDS server
- // The dubbo-proxy sidecar needs time to generate this file
- if err := waitForBootstrapFile(bootstrapPath, 60*time.Second); err !=
nil {
- log.Fatalf("Failed to wait for bootstrap file: %v", err)
+ // Verify bootstrap file exists
+ if _, err := os.Stat(bootstrapPath); os.IsNotExist(err) {
+ return nil, fmt.Errorf("bootstrap file does not exist: %s",
bootstrapPath)
}
- // Create xDS-enabled gRPC server
- // For proxyless gRPC, we use xds.NewGRPCServer() instead of
grpc.NewServer()
- creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
- FallbackCreds: insecure.NewCredentials(),
- })
+ // Read bootstrap file to verify UDS socket
+ bootstrapData, err := os.ReadFile(bootstrapPath)
if err != nil {
- log.Fatalf("Failed to create xDS server credentials: %v", err)
+ return nil, fmt.Errorf("failed to read bootstrap file: %v", err)
}
- server, err := xds.NewGRPCServer(grpc.Creds(creds))
- if err != nil {
- log.Fatalf("Failed to create xDS gRPC server: %v", err)
+ var bootstrapJSON map[string]interface{}
+ if err := json.Unmarshal(bootstrapData, &bootstrapJSON); err != nil {
+ return nil, fmt.Errorf("failed to parse bootstrap file: %v",
err)
}
- es := &echoServer{
- hostname: hostname,
- serviceVersion: serviceVersion,
- namespace: namespace,
- instanceIP: instanceIP,
- cluster: cluster,
- servicePort: servicePort,
+ // Extract UDS socket path
+ var udsPath string
+ if xdsServers, ok := bootstrapJSON["xds_servers"].([]interface{}); ok
&& len(xdsServers) > 0 {
+ if server, ok := xdsServers[0].(map[string]interface{}); ok {
+ if serverURI, ok := server["server_uri"].(string); ok {
+ if strings.HasPrefix(serverURI, "unix://") {
+ udsPath = strings.TrimPrefix(serverURI,
"unix://")
+ if _, err := os.Stat(udsPath);
os.IsNotExist(err) {
+ return nil, fmt.Errorf("UDS
socket does not exist: %s", udsPath)
+ }
+ }
+ }
+ }
}
- pb.RegisterEchoServiceServer(server, es)
- pb.RegisterEchoTestServiceServer(server, es)
- // Enable reflection API for grpcurl to discover services
- reflection.Register(server)
- lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
- if err != nil {
- log.Fatalf("Failed to listen: %v", err)
+ // CRITICAL: Reuse connections to avoid creating new xDS connections
for each RPC call
+ // This prevents the RDS request loop issue and ensures stable
connection state
+ s.connMutex.RLock()
+ cached, exists := s.connCache[req.Url]
+ var conn *grpc.ClientConn
+ if exists && cached != nil {
+ conn = cached.conn
+ }
+ s.connMutex.RUnlock()
+
+ // CRITICAL: Check if cached connection is still valid and not too old.
+ // When xDS config changes (e.g., TLS is added/removed), gRPC xDS
client should update connections,
+ // but if the connection was established before xDS config was
received, it may use old configuration.
+ // To ensure we use the latest xDS config, we clear connections older
than 10 seconds.
+ // This ensures that when SubsetRule or PeerAuthentication is
created/updated, connections are
+ // rebuilt quickly to use the new configuration.
+ const maxConnectionAge = 10 * time.Second
+ if exists && conn != nil {
+ state := conn.GetState()
+ if state == connectivity.Shutdown {
+ // Connection is closed, remove from cache
+ log.Printf("ForwardEcho: cached connection for %s is
SHUTDOWN, removing from cache", req.Url)
+ s.connMutex.Lock()
+ delete(s.connCache, req.Url)
+ conn = nil
+ exists = false
+ s.connMutex.Unlock()
+ } else if time.Since(cached.createdAt) > maxConnectionAge {
+ // Connection is too old, may be using stale xDS config
(e.g., plaintext when TLS is required)
+ // Clear cache to force reconnection with latest xDS
config
+ log.Printf("ForwardEcho: cached connection for %s is
too old (%v), clearing cache to use latest xDS config", req.Url,
time.Since(cached.createdAt))
+ s.connMutex.Lock()
+ if cachedConn, stillExists := s.connCache[req.Url];
stillExists && cachedConn != nil && cachedConn.conn != nil {
+ cachedConn.conn.Close()
+ }
+ delete(s.connCache, req.Url)
+ conn = nil
+ exists = false
+ s.connMutex.Unlock()
+ }
+ }
+
+ if !exists || conn == nil {
+ // Create new connection
+ s.connMutex.Lock()
+ // Double-check after acquiring write lock
+ if cached, exists = s.connCache[req.Url]; !exists || cached ==
nil || cached.conn == nil {
+ conn = nil
+ // CRITICAL: When TLS is configured (SubsetRule
ISTIO_MUTUAL), gRPC xDS client needs
+ // to fetch certificates from CertificateProvider. The
CertificateProvider uses file_watcher
+ // to read certificate files. If the files are not
ready or CertificateProvider is not
+ // initialized, certificate fetching will timeout.
+ // We wait a short time to ensure CertificateProvider
is ready and certificate files are accessible.
+ // This is especially important when SubsetRule is just
created and TLS is enabled.
+ // The CertificateProvider may need time to initialize,
especially on first connection.
+ // We wait 3 seconds to give CertificateProvider enough
time to initialize (reduced from 5s for faster startup).
+ log.Printf("ForwardEcho: waiting 3 seconds to ensure
CertificateProvider is ready...")
+ time.Sleep(3 * time.Second)
+
+ // Create xDS client credentials
+ // NOTE: FallbackCreds is REQUIRED by gRPC xDS library
for initial connection
+ // before xDS configuration is available. However, once
xDS configures TLS,
+ // the client will use TLS and will NOT fallback to
plaintext if TLS fails.
+ // FallbackCreds is only used when xDS has not yet
provided TLS configuration.
+ creds, err :=
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+ FallbackCreds: insecure.NewCredentials(),
+ })
+ if err != nil {
+ s.connMutex.Unlock()
+ return nil, fmt.Errorf("failed to create xDS
client credentials: %v", err)
+ }
+
+ // Dial with xDS URL - use background context, not the
request context
+ // The request context might timeout before xDS
configuration is received
+ // CRITICAL: When TLS is configured (SubsetRule
ISTIO_MUTUAL), gRPC xDS client needs
+ // to fetch certificates from CertificateProvider. This
may take time, especially on
+ // first connection. We use a longer timeout context to
allow certificate fetching.
+ log.Printf("ForwardEcho: creating new connection for
%s...", req.Url)
+ dialCtx, dialCancel :=
context.WithTimeout(context.Background(), 60*time.Second)
+ conn, err = grpc.DialContext(dialCtx, req.Url,
grpc.WithTransportCredentials(creds))
+ dialCancel()
+ if err != nil {
+ s.connMutex.Unlock()
+ return nil, fmt.Errorf("failed to dial %s: %v",
req.Url, err)
+ }
+ s.connCache[req.Url] = &cachedConnection{
+ conn: conn,
+ createdAt: time.Now(),
+ }
+ log.Printf("ForwardEcho: cached connection for %s",
req.Url)
+ }
+ s.connMutex.Unlock()
+ } else {
+ log.Printf("ForwardEcho: reusing cached connection for %s
(state: %v)", req.Url, conn.GetState())
+ // NOTE: We reuse the cached connection. If xDS config changes
(e.g., TLS is added/removed),
+ // gRPC xDS client should automatically update the connection.
However, if the connection
+ // was established before xDS config was received, it may still
be using old configuration.
+ // We rely on the TLS mismatch detection logic in the RPC error
handling to handle this case.
}
- log.Printf("Starting gRPC proxyless server on port %d (hostname: %s)",
*port, hostname)
-
- go func() {
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- <-sigChan
- log.Println("Shutting down server...")
- server.GracefulStop()
- }()
-
- // Serve the gRPC server
- // Note: server.Serve returns when the listener is closed, which is
normal during shutdown
- // Connection reset errors are handled by the gRPC library and logged
separately
- if err := server.Serve(lis); err != nil {
- // Only log as fatal if it's not a normal shutdown (listener
closed)
- if !strings.Contains(err.Error(), "use of closed network
connection") {
- log.Fatalf("Failed to serve: %v", err)
+ initialState := conn.GetState()
+ log.Printf("ForwardEcho: initial connection state: %v", initialState)
+
+ // CRITICAL: Even if connection is READY, we need to verify it's still
valid
+ // because xDS configuration may have changed (e.g., from plaintext to
TLS)
+ // and the cached connection might be using old configuration.
+ // gRPC xDS client should automatically update connections, but if the
connection
+ // was established before xDS config was received, it might be using
FallbackCreds (plaintext).
+ // We'll proceed with RPC calls, but if they fail with TLS/plaintext
mismatch errors,
+ // we'll clear the cache and retry.
+ // CRITICAL: When TLS is configured (SubsetRule ISTIO_MUTUAL), gRPC xDS
client needs
+ // to fetch certificates from CertificateProvider during TLS handshake.
The TLS handshake
+ // happens when the connection state transitions to READY. If
CertificateProvider is not ready,
+ // the TLS handshake will timeout. We need to wait for the connection
to be READY, which
+ // indicates that TLS handshake has completed successfully.
+ if initialState == connectivity.Ready {
+ log.Printf("ForwardEcho: connection is already READY,
proceeding with RPC calls (will retry with new connection if TLS/plaintext
mismatch detected)")
+ } else {
+ // Only wait for new connections or connections that are not
READY
+ // For gRPC xDS proxyless, we need to wait for the client to
receive and process LDS/CDS/EDS
+ // The connection state may transition: IDLE -> CONNECTING ->
READY (or TRANSIENT_FAILURE -> CONNECTING -> READY)
+ // CRITICAL: When TLS is configured, the TLS handshake happens
during this state transition.
+ // If CertificateProvider is not ready, the TLS handshake will
timeout and connection will fail.
+ // We use a longer timeout (60 seconds) to allow
CertificateProvider to fetch certificates.
+ log.Printf("ForwardEcho: waiting for xDS configuration to be
processed and connection to be ready (60 seconds)...")
+
+ // Wait for state changes with multiple attempts
+ maxWait := 60 * time.Second
+
+ // Wait for state changes, allowing multiple state transitions
+ // CRITICAL: Don't exit on TRANSIENT_FAILURE - it may recover
to READY
+ stateChanged := false
+ currentState := initialState
+ startTime := time.Now()
+ lastStateChangeTime := startTime
+
+ for time.Since(startTime) < maxWait {
+ if currentState == connectivity.Ready {
+ log.Printf("ForwardEcho: connection is READY
after %v", time.Since(startTime))
+ stateChanged = true
+ break
+ }
+
+ // Only exit on Shutdown, not on TransientFailure (it
may recover)
+ if currentState == connectivity.Shutdown {
+ log.Printf("ForwardEcho: connection in %v state
after %v, cannot recover", currentState, time.Since(startTime))
+ break
+ }
+
+ // Wait for state change with remaining timeout
+ remaining := maxWait - time.Since(startTime)
+ if remaining <= 0 {
+ break
+ }
+
+ // Use shorter timeout for each WaitForStateChange call
to allow periodic checks
+ waitTimeout := remaining
+ if waitTimeout > 5*time.Second {
+ waitTimeout = 5 * time.Second
+ }
+
+ stateCtx, stateCancel :=
context.WithTimeout(context.Background(), waitTimeout)
+ if conn.WaitForStateChange(stateCtx, currentState) {
+ newState := conn.GetState()
+ elapsed := time.Since(startTime)
+ log.Printf("ForwardEcho: connection state
changed from %v to %v after %v", currentState, newState, elapsed)
+ stateChanged = true
+ currentState = newState
+ lastStateChangeTime = time.Now()
+
+ // If READY, we're done
+ if newState == connectivity.Ready {
+ stateCancel()
+ break
+ }
+
+ // If we're in TRANSIENT_FAILURE, continue
waiting - it may recover
+ // gRPC xDS client will retry connection when
endpoints become available
+ if newState == connectivity.TransientFailure {
+ log.Printf("ForwardEcho: connection in
TRANSIENT_FAILURE, continuing to wait for recovery (remaining: %v)",
maxWait-elapsed)
+ }
+ } else {
+ // Timeout waiting for state change - check if
we should continue
+ elapsed := time.Since(startTime)
+ if currentState ==
connectivity.TransientFailure {
+ // If we've been in TRANSIENT_FAILURE
for a while, continue waiting
+ // The connection may recover when
endpoints become available
+ if time.Since(lastStateChangeTime) <
10*time.Second {
+ log.Printf("ForwardEcho: still
in TRANSIENT_FAILURE after %v, continuing to wait (remaining: %v)", elapsed,
maxWait-elapsed)
+ } else {
+ log.Printf("ForwardEcho: no
state change after %v, current state: %v (remaining: %v)", elapsed,
currentState, maxWait-elapsed)
+ }
+ } else {
+ log.Printf("ForwardEcho: no state
change after %v, current state: %v (remaining: %v)", elapsed, currentState,
maxWait-elapsed)
+ }
+ }
+ stateCancel()
+ }
+
+ finalState := conn.GetState()
+ log.Printf("ForwardEcho: final connection state: %v
(stateChanged=%v, waited=%v)", finalState, stateChanged, time.Since(startTime))
+
+ // If connection is not READY, log a warning but proceed anyway
+ // The first RPC call may trigger connection establishment
+ if finalState != connectivity.Ready {
+ log.Printf("ForwardEcho: WARNING - connection is not
READY (state=%v), but proceeding with RPC calls", finalState)
}
- log.Printf("Server stopped: %v", err)
}
+
+ // Create client and make RPC calls
+ client := pb.NewEchoServiceClient(conn)
+ output := make([]string, 0, count)
+
+ log.Printf("ForwardEcho: sending %d requests...", count)
+ errorCount := 0
+ firstError := ""
+ for i := int32(0); i < count; i++ {
+ echoReq := &pb.EchoRequest{
+ Message: fmt.Sprintf("Request %d", i+1),
+ }
+
+ currentState := conn.GetState()
+ log.Printf("ForwardEcho: sending request %d (connection state:
%v)...", i+1, currentState)
+
+ // Use longer timeout for requests to allow TLS handshake
completion
+ // When mTLS is configured, certificate fetching and TLS
handshake may take time
+ // Use 30 seconds for all requests to ensure TLS handshake has
enough time
+ timeout := 30 * time.Second
+
+ reqCtx, reqCancel := context.WithTimeout(context.Background(),
timeout)
+ reqStartTime := time.Now()
+ resp, err := client.Echo(reqCtx, echoReq)
+ duration := time.Since(reqStartTime)
+ reqCancel()
+
+ // Check connection state after RPC call
+ stateAfterRPC := conn.GetState()
+ log.Printf("ForwardEcho: request %d completed in %v, connection
state: %v (was %v)", i+1, duration, stateAfterRPC, currentState)
+
+ if err != nil {
+ log.Printf("ForwardEcho: request %d failed: %v", i+1,
err)
+ errorCount++
+ if firstError == "" {
+ firstError = err.Error()
+ }
+
+ // Format error similar to grpcurl output
+ errMsg := formatGRPCError(err, i, count)
+ output = append(output, errMsg)
+
+ // CRITICAL: Only clear cache if we detect specific
TLS/plaintext mismatch errors.
+ // TRANSIENT_FAILURE can occur for many reasons (e.g.,
xDS config updates, endpoint changes),
+ // so we should NOT clear cache on every
TRANSIENT_FAILURE.
+ // Only clear cache when we detect explicit TLS-related
errors that indicate a mismatch.
+ errStr := err.Error()
+ isTLSMismatch := false
+
+ // Check for specific TLS/plaintext mismatch indicators:
+ // - "tls: first record does not look like a TLS
handshake" - client uses TLS but server is plaintext
+ // - "authentication handshake failed" with TLS context
- TLS handshake failed
+ // - "context deadline exceeded" during authentication
handshake - may indicate TLS handshake timeout
+ // (e.g., client using plaintext but server requiring
TLS, or vice versa)
+ // - "fetching trusted roots from CertificateProvider
failed" - CertificateProvider not ready yet
+ // This is a temporary error that should be retried
after waiting for CertificateProvider to be ready
+ // These errors indicate that client and server TLS
configuration are mismatched, or CertificateProvider is not ready
+ if strings.Contains(errStr, "tls: first record does not
look like a TLS handshake") ||
+ (strings.Contains(errStr, "authentication
handshake failed") && strings.Contains(errStr, "tls:")) ||
+ (strings.Contains(errStr, "authentication
handshake failed") && strings.Contains(errStr, "context deadline exceeded")) ||
+ strings.Contains(errStr, "fetching trusted
roots from CertificateProvider failed") {
+ isTLSMismatch = true
+ log.Printf("ForwardEcho: detected TLS/plaintext
mismatch or CertificateProvider not ready error: %v", err)
+ }
+
+ // CRITICAL: When TLS mismatch is detected, immediately
clear cache and force reconnection.
+ // This ensures that:
+ // 1. If client config changed (plaintext -> TLS), new
connection uses TLS
+ // 2. If server config changed (TLS -> plaintext), new
connection uses plaintext
+ // 3. Connection behavior is consistent with current
xDS configuration
+ // According to Istio proxyless gRPC behavior:
+ // - When only client TLS (SubsetRule ISTIO_MUTUAL) but
server plaintext: connection SHOULD FAIL
+ // - When client TLS + server mTLS (PeerAuthentication
STRICT): connection SHOULD SUCCEED
+ // - When both plaintext: connection SHOULD SUCCEED
+ // By clearing cache and reconnecting, we ensure
connection uses current xDS config.
+ if isTLSMismatch {
+ // Check if this is a CertificateProvider not
ready error (temporary) vs configuration mismatch (persistent)
+ isCertProviderNotReady :=
strings.Contains(errStr, "fetching trusted roots from CertificateProvider
failed")
+
+ // Clear cache and force reconnection on every
TLS mismatch detection
+ // This ensures we always try to use the latest
xDS configuration
+ if isCertProviderNotReady {
+ log.Printf("ForwardEcho: WARNING -
CertificateProvider not ready yet: %v", err)
+ log.Printf("ForwardEcho: NOTE - This is
a temporary error. CertificateProvider needs time to initialize.")
+ log.Printf("ForwardEcho: Clearing
connection cache and waiting for CertificateProvider to be ready...")
+ } else {
+ log.Printf("ForwardEcho: WARNING -
detected TLS/plaintext mismatch error: %v", err)
+ log.Printf("ForwardEcho: NOTE - This
error indicates that client and server TLS configuration are mismatched")
+ log.Printf("ForwardEcho: This usually
happens when:")
+ log.Printf("ForwardEcho: 1.
SubsetRule with ISTIO_MUTUAL exists but PeerAuthentication with STRICT does not
(client TLS, server plaintext)")
+ log.Printf("ForwardEcho: 2.
SubsetRule was deleted but cached connection still uses TLS")
+ log.Printf("ForwardEcho: Clearing
connection cache to force reconnection with updated xDS config...")
+ }
+
+ s.connMutex.Lock()
+ if cachedConn, stillExists :=
s.connCache[req.Url]; stillExists && cachedConn != nil && cachedConn.conn !=
nil {
+ cachedConn.conn.Close()
+ }
+ delete(s.connCache, req.Url)
+ conn = nil
+ s.connMutex.Unlock()
+
+ // CRITICAL: Wait for xDS config to propagate
and be processed by gRPC xDS client.
+ // When CDS/LDS config changes, it takes time
for:
+ // 1. Control plane to push new config to gRPC
xDS client
+ // 2. gRPC xDS client to process and apply new
config
+ // 3. CertificateProvider to be ready and
certificate files to be accessible
+ // 4. New connections to use updated config
+ // For CertificateProvider not ready errors, we
wait shorter time (3 seconds) as it's usually faster.
+ // For configuration mismatch, we wait longer
(10 seconds) to ensure config has propagated.
+ // Reduced wait times for faster recovery while
still ensuring reliability.
+ if isCertProviderNotReady {
+ log.Printf("ForwardEcho: waiting 3
seconds for CertificateProvider to be ready...")
+ time.Sleep(3 * time.Second)
+ } else {
+ log.Printf("ForwardEcho: waiting 10
seconds for xDS config to propagate and CertificateProvider to be ready...")
+ time.Sleep(10 * time.Second)
+ }
+
+ // Recreate connection - this will use current
xDS config
+ // CRITICAL: When TLS is configured, gRPC xDS
client needs to fetch certificates
+ // from CertificateProvider. Use a longer
timeout to allow certificate fetching.
+ log.Printf("ForwardEcho: recreating connection
with current xDS config...")
+ creds, credErr :=
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
+ FallbackCreds:
insecure.NewCredentials(),
+ })
+ if credErr != nil {
+ log.Printf("ForwardEcho: failed to
create xDS client credentials: %v", credErr)
+ // Continue to record error
+ } else {
+ // Wait additional time before dialing
to ensure CertificateProvider is ready
+ // Reduced from 3s to 2s for faster
recovery
+ log.Printf("ForwardEcho: waiting 2
seconds before dialing to ensure CertificateProvider is ready...")
+ time.Sleep(2 * time.Second)
+
+ dialCtx, dialCancel :=
context.WithTimeout(context.Background(), 60*time.Second)
+ newConn, dialErr :=
grpc.DialContext(dialCtx, req.Url, grpc.WithTransportCredentials(creds))
+ dialCancel()
+ if dialErr != nil {
+ log.Printf("ForwardEcho: failed
to dial %s: %v", req.Url, dialErr)
+ // Continue to record error
+ } else {
+ s.connMutex.Lock()
+ s.connCache[req.Url] =
&cachedConnection{
+ conn: newConn,
+ createdAt: time.Now(),
+ }
+ conn = newConn
+ client =
pb.NewEchoServiceClient(conn)
+ s.connMutex.Unlock()
+ log.Printf("ForwardEcho:
connection recreated, retrying request %d", i+1)
+ // Retry this request with new
connection
+ continue
+ }
+ }
+ }
+ if i < count-1 {
+ waitTime := 2 * time.Second
+ log.Printf("ForwardEcho: waiting %v before next
request...", waitTime)
+ time.Sleep(waitTime)
+ }
+ continue
+ }
+
+ if resp == nil {
+ log.Printf("ForwardEcho: request %d failed: response is
nil", i+1)
+ output = append(output, fmt.Sprintf("[%d] Error:
response is nil", i))
+ continue
+ }
+
+ log.Printf("ForwardEcho: request %d succeeded: Hostname=%s
ServiceVersion=%s Namespace=%s IP=%s",
+ i+1, resp.Hostname, resp.ServiceVersion,
resp.Namespace, resp.Ip)
+
+ lineParts := []string{
+ fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
+ }
+ if resp.ServiceVersion != "" {
+ lineParts = append(lineParts,
fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
+ }
+ if resp.Namespace != "" {
+ lineParts = append(lineParts,
fmt.Sprintf("Namespace=%s", resp.Namespace))
+ }
+ if resp.Ip != "" {
+ lineParts = append(lineParts, fmt.Sprintf("IP=%s",
resp.Ip))
+ }
+ if resp.Cluster != "" {
+ lineParts = append(lineParts, fmt.Sprintf("Cluster=%s",
resp.Cluster))
+ }
+ if resp.ServicePort > 0 {
+ lineParts = append(lineParts,
fmt.Sprintf("ServicePort=%d", resp.ServicePort))
+ }
+
+ output = append(output, strings.Join(lineParts, " "))
+
+ // Small delay between successful requests to avoid
overwhelming the server
+ if i < count-1 {
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+
+ log.Printf("ForwardEcho: completed %d requests", count)
+
+ // If all requests failed, add summary similar to grpcurl
+ if errorCount > 0 && errorCount == int(count) && firstError != "" {
+ summary := fmt.Sprintf("ERROR:\nCode: Unknown\nMessage: %d/%d
requests had errors; first error: %s", errorCount, count, firstError)
+ // Prepend summary to output
+ output = append([]string{summary}, output...)
+ }
+
+ return &pb.ForwardEchoResponse{
+ Output: output,
+ }, nil
}
diff --git a/tests/grpc-app/docker/dockerfile.consumer
b/tests/grpc-app/docker/dockerfile.consumer
index 78aa4274..8c4def20 100644
--- a/tests/grpc-app/docker/dockerfile.consumer
+++ b/tests/grpc-app/docker/dockerfile.consumer
@@ -47,4 +47,7 @@ WORKDIR /app
COPY --from=builder /build/grpc-consumer /usr/local/bin/grpc-consumer
RUN chmod +x /usr/local/bin/grpc-consumer
+COPY ./grpcurl /usr/local/bin/grpcurl
+RUN chmod +x /usr/local/bin/grpcurl
+
ENTRYPOINT ["/usr/local/bin/grpc-consumer"]
diff --git a/tests/grpc-app/producer/main.go b/tests/grpc-app/producer/main.go
index 8bb23199..52debbe5 100644
--- a/tests/grpc-app/producer/main.go
+++ b/tests/grpc-app/producer/main.go
@@ -19,7 +19,6 @@ package main
import (
"context"
- "encoding/json"
"flag"
"fmt"
"log"
@@ -27,29 +26,105 @@ import (
"os"
"os/signal"
"regexp"
+ "strconv"
"strings"
- "sync"
"syscall"
"time"
"google.golang.org/grpc"
- "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/reflection"
- _ "google.golang.org/grpc/xds"
+ "google.golang.org/grpc/xds"
pb "github.com/apache/dubbo-kubernetes/tests/grpc-app/proto"
)
var (
- target = flag.String("target", "", "Target service address with
xds:/// scheme (optional)")
- count = flag.Int("count", 5, "Number of requests to send")
- port = flag.Int("port", 17171, "gRPC server port for ForwardEcho
testing")
- testServer *grpc.Server
+ port = flag.Int("port", 17070, "gRPC server port")
)
+type echoServer struct {
+ pb.UnimplementedEchoServiceServer
+ pb.UnimplementedEchoTestServiceServer
+ hostname string
+ serviceVersion string
+ namespace string
+ instanceIP string
+ cluster string
+ servicePort int
+}
+
+func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest)
(*pb.EchoResponse, error) {
+ if req == nil {
+ return nil, fmt.Errorf("request is nil")
+ }
+ log.Printf("Received: %v", req.Message)
+ return &pb.EchoResponse{
+ Message: req.Message,
+ Hostname: s.hostname,
+ ServiceVersion: s.serviceVersion,
+ Namespace: s.namespace,
+ Ip: s.instanceIP,
+ Cluster: s.cluster,
+ ServicePort: int32(s.servicePort),
+ }, nil
+}
+
+func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream
pb.EchoService_StreamEchoServer) error {
+ if req == nil {
+ return fmt.Errorf("request is nil")
+ }
+ if stream == nil {
+ return fmt.Errorf("stream is nil")
+ }
+ log.Printf("StreamEcho received: %v", req.Message)
+ for i := 0; i < 3; i++ {
+ if err := stream.Send(&pb.EchoResponse{
+ Message: fmt.Sprintf("%s [%d]", req.Message, i),
+ Hostname: s.hostname,
+ }); err != nil {
+ log.Printf("StreamEcho send error: %v", err)
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *echoServer) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
+ if req == nil {
+ return nil, fmt.Errorf("request is nil")
+ }
+
+ count := req.Count
+ if count < 0 {
+ count = 0
+ }
+ if count > 100 {
+ count = 100
+ }
+
+ log.Printf("ForwardEcho called: url=%s, count=%d", req.Url, count)
+
+ output := make([]string, 0, count)
+ for i := int32(0); i < count; i++ {
+ line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s
ServicePort=%d Namespace=%s",
+ i, s.hostname, s.serviceVersion, s.servicePort,
s.namespace)
+ if s.instanceIP != "" {
+ line += fmt.Sprintf(" IP=%s", s.instanceIP)
+ }
+ if s.cluster != "" {
+ line += fmt.Sprintf(" Cluster=%s", s.cluster)
+ }
+ output = append(output, line)
+ }
+
+ return &pb.ForwardEchoResponse{
+ Output: output,
+ }, nil
+}
+
// grpcLogger filters out xDS informational logs that are incorrectly marked
as ERROR
type grpcLogger struct {
logger *log.Logger
@@ -83,6 +158,7 @@ func cleanMessage(msg string) string {
func (l *grpcLogger) Info(args ...interface{}) {
msg := fmt.Sprint(args...)
+ // Filter out xDS "entering mode: SERVING" logs
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
@@ -125,9 +201,18 @@ func (l *grpcLogger) Warningf(format string, args
...interface{}) {
func (l *grpcLogger) Error(args ...interface{}) {
msg := fmt.Sprint(args...)
+ // Filter out xDS "entering mode: SERVING" logs that are incorrectly
marked as ERROR
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ // Filter out common connection reset errors - these are normal network
behavior
+ // when clients disconnect before completing the HTTP/2 handshake
+ if strings.Contains(msg, "connection reset by peer") ||
+ strings.Contains(msg, "failed to receive the preface from
client") ||
+ strings.Contains(msg, "connection error") {
+ // These are normal network events, log at DEBUG level instead
of ERROR
+ return
+ }
msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -137,6 +222,12 @@ func (l *grpcLogger) Errorln(args ...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ // Filter out common connection reset errors - these are normal network
behavior
+ if strings.Contains(msg, "connection reset by peer") ||
+ strings.Contains(msg, "failed to receive the preface from
client") ||
+ strings.Contains(msg, "connection error") {
+ return
+ }
msg = cleanMessage(msg)
l.logger.Print("ERROR: ", msg)
}
@@ -146,6 +237,12 @@ func (l *grpcLogger) Errorf(format string, args
...interface{}) {
if strings.Contains(msg, "entering mode") && strings.Contains(msg,
"SERVING") {
return
}
+ // Filter out common connection reset errors - these are normal network
behavior
+ if strings.Contains(msg, "connection reset by peer") ||
+ strings.Contains(msg, "failed to receive the preface from
client") ||
+ strings.Contains(msg, "connection error") {
+ return
+ }
msg = cleanMessage(msg)
l.logger.Printf("ERROR: %s", msg)
}
@@ -166,398 +263,143 @@ func (l *grpcLogger) V(level int) bool {
return level <= 0
}
-func main() {
- flag.Parse()
-
- // Set custom gRPC logger to filter out xDS informational logs
- // The "ERROR: [xds] Listener entering mode: SERVING" is actually an
informational log
- grpclog.SetLoggerV2(&grpcLogger{
- logger: log.New(os.Stderr, "", log.LstdFlags),
- })
-
- go startTestServer(*port)
-
- if *target != "" {
- testDirectConnection(*target, *count)
- }
-
- log.Printf("Producer running. Test server listening on port %d for
ForwardEcho", *port)
-
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- <-sigChan
- log.Println("Shutting down...")
-
- if testServer != nil {
- log.Println("Stopping test server...")
- testServer.GracefulStop()
- }
-}
-
-func testDirectConnection(target string, count int) {
- creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
- FallbackCreds: insecure.NewCredentials(),
- })
- if err != nil {
- log.Fatalf("Failed to create xDS client credentials: %v", err)
- }
-
- ctx := context.Background()
- conn, err := grpc.DialContext(ctx, target,
grpc.WithTransportCredentials(creds))
- if err != nil {
- log.Fatalf("Failed to connect: %v", err)
- }
- defer conn.Close()
-
- client := pb.NewEchoServiceClient(conn)
+// waitForBootstrapFile waits for the grpc-bootstrap.json file to exist
+// This is necessary because the dubbo-proxy sidecar needs time to generate
the file
+func waitForBootstrapFile(bootstrapPath string, maxWait time.Duration) error {
+ log.Printf("Waiting for bootstrap file to exist: %s (max wait: %v)",
bootstrapPath, maxWait)
- log.Printf("Connected to %s, sending %d requests...", target, count)
+ ctx, cancel := context.WithTimeout(context.Background(), maxWait)
+ defer cancel()
- for i := 0; i < count; i++ {
- req := &pb.EchoRequest{
- Message: fmt.Sprintf("Hello from producer [%d]", i+1),
- }
-
- reqCtx, cancel := context.WithTimeout(context.Background(),
10*time.Second)
- resp, err := client.Echo(reqCtx, req)
- cancel()
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
- if err != nil {
- log.Printf("Request %d failed: %v", i+1, err)
- continue
+ startTime := time.Now()
+ for {
+ // Check if file exists and is not empty
+ if info, err := os.Stat(bootstrapPath); err == nil &&
info.Size() > 0 {
+ log.Printf("Bootstrap file found after %v: %s",
time.Since(startTime), bootstrapPath)
+ return nil
}
- if resp == nil {
- log.Printf("Request %d failed: response is nil", i+1)
- continue
+ // Check for timeout
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("timeout waiting for bootstrap file:
%s (waited %v)", bootstrapPath, time.Since(startTime))
+ case <-ticker.C:
+ // Continue waiting
}
-
- log.Printf("Request %d: Response=%s, Hostname=%s", i+1,
resp.Message, resp.Hostname)
- time.Sleep(500 * time.Millisecond)
}
-
- log.Println("All requests completed")
}
-func startTestServer(port int) {
- lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
- if err != nil {
- log.Printf("Failed to listen on port %d: %v", port, err)
- return
- }
-
- testServer = grpc.NewServer()
- pb.RegisterEchoTestServiceServer(testServer, &testServerImpl{
- connCache: make(map[string]*grpc.ClientConn),
- })
- reflection.Register(testServer)
-
- log.Printf("Test server listening on port %d for ForwardEcho
(reflection enabled)", port)
- if err := testServer.Serve(lis); err != nil {
- log.Printf("Test server error: %v", err)
+func firstNonEmpty(values ...string) string {
+ for _, v := range values {
+ if strings.TrimSpace(v) != "" {
+ return v
+ }
}
+ return ""
}
-type testServerImpl struct {
- pb.UnimplementedEchoTestServiceServer
- // Connection cache: map from URL to gRPC connection
- connCache map[string]*grpc.ClientConn
- connMutex sync.RWMutex
-}
+func main() {
+ flag.Parse()
-func (s *testServerImpl) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest) (*pb.ForwardEchoResponse, error) {
- if req == nil {
- return nil, fmt.Errorf("request is nil")
- }
+ // Set custom gRPC logger to filter out xDS informational logs
+ // The "ERROR: [xds] Listener entering mode: SERVING" is actually an
informational log
+ grpclog.SetLoggerV2(&grpcLogger{
+ logger: log.New(os.Stderr, "", log.LstdFlags),
+ })
- if req.Url == "" {
- return nil, fmt.Errorf("url is required")
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "unknown"
}
- count := req.Count
- if count < 0 {
- count = 0
+ namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"),
os.Getenv("POD_NAMESPACE"), "default")
+ serviceVersion := firstNonEmpty(
+ os.Getenv("SERVICE_VERSION"),
+ os.Getenv("POD_VERSION"),
+ os.Getenv("VERSION"),
+ )
+ if serviceVersion == "" {
+ serviceVersion = "unknown"
}
- if count > 100 {
- count = 100
+ cluster := os.Getenv("SERVICE_CLUSTER")
+ instanceIP := os.Getenv("INSTANCE_IP")
+ servicePort := *port
+ if sp := os.Getenv("SERVICE_PORT"); sp != "" {
+ if parsed, err := strconv.Atoi(sp); err == nil {
+ servicePort = parsed
+ }
}
- log.Printf("ForwardEcho: url=%s, count=%d", req.Url, count)
-
- // Check bootstrap configuration
+ // Get bootstrap file path from environment variable or use default
bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
if bootstrapPath == "" {
- return nil, fmt.Errorf("GRPC_XDS_BOOTSTRAP environment variable
is not set")
+ bootstrapPath = "/etc/dubbo/proxy/grpc-bootstrap.json"
+ log.Printf("GRPC_XDS_BOOTSTRAP not set, using default: %s",
bootstrapPath)
}
- // Verify bootstrap file exists
- if _, err := os.Stat(bootstrapPath); os.IsNotExist(err) {
- return nil, fmt.Errorf("bootstrap file does not exist: %s",
bootstrapPath)
+ // Wait for bootstrap file to exist before creating xDS server
+ // The dubbo-proxy sidecar needs time to generate this file
+ if err := waitForBootstrapFile(bootstrapPath, 60*time.Second); err !=
nil {
+ log.Fatalf("Failed to wait for bootstrap file: %v", err)
}
- // Read bootstrap file to verify UDS socket
- bootstrapData, err := os.ReadFile(bootstrapPath)
+ // Create xDS-enabled gRPC server
+ // For proxyless gRPC, we use xds.NewGRPCServer() instead of
grpc.NewServer()
+ // NOTE: FallbackCreds is REQUIRED by gRPC xDS library for initial
connection
+ // before xDS configuration is available. However, once xDS configures
TLS,
+ // the server will use TLS and will NOT fallback to plaintext if TLS
fails.
+ // FallbackCreds is only used when xDS has not yet provided TLS
configuration.
+ creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
+ FallbackCreds: insecure.NewCredentials(),
+ })
if err != nil {
- return nil, fmt.Errorf("failed to read bootstrap file: %v", err)
+ log.Fatalf("Failed to create xDS server credentials: %v", err)
}
- var bootstrapJSON map[string]interface{}
- if err := json.Unmarshal(bootstrapData, &bootstrapJSON); err != nil {
- return nil, fmt.Errorf("failed to parse bootstrap file: %v",
err)
- }
-
- // Extract UDS socket path
- var udsPath string
- if xdsServers, ok := bootstrapJSON["xds_servers"].([]interface{}); ok
&& len(xdsServers) > 0 {
- if server, ok := xdsServers[0].(map[string]interface{}); ok {
- if serverURI, ok := server["server_uri"].(string); ok {
- if strings.HasPrefix(serverURI, "unix://") {
- udsPath = strings.TrimPrefix(serverURI,
"unix://")
- if _, err := os.Stat(udsPath);
os.IsNotExist(err) {
- return nil, fmt.Errorf("UDS
socket does not exist: %s", udsPath)
- }
- }
- }
- }
- }
-
- // CRITICAL: Reuse connections to avoid creating new xDS connections
for each RPC call
- // This prevents the RDS request loop issue and ensures stable
connection state
- s.connMutex.RLock()
- conn, exists := s.connCache[req.Url]
- s.connMutex.RUnlock()
-
- // Check if cached connection is still valid (not closed/shutdown)
- if exists && conn != nil {
- state := conn.GetState()
- if state == connectivity.Shutdown {
- // Connection is closed, remove from cache and create
new one
- log.Printf("ForwardEcho: cached connection for %s is
SHUTDOWN, removing from cache", req.Url)
- s.connMutex.Lock()
- delete(s.connCache, req.Url)
- conn = nil
- exists = false
- s.connMutex.Unlock()
- }
+ server, err := xds.NewGRPCServer(grpc.Creds(creds))
+ if err != nil {
+ log.Fatalf("Failed to create xDS gRPC server: %v", err)
}
- if !exists || conn == nil {
- // Create new connection
- s.connMutex.Lock()
- // Double-check after acquiring write lock
- if conn, exists = s.connCache[req.Url]; !exists || conn == nil {
- // Create xDS client credentials
- creds, err :=
xdscreds.NewClientCredentials(xdscreds.ClientOptions{
- FallbackCreds: insecure.NewCredentials(),
- })
- if err != nil {
- s.connMutex.Unlock()
- return nil, fmt.Errorf("failed to create xDS
client credentials: %v", err)
- }
-
- // Dial with xDS URL - use background context, not the
request context
- // The request context might timeout before xDS
configuration is received
- log.Printf("ForwardEcho: creating new connection for
%s...", req.Url)
- conn, err = grpc.DialContext(context.Background(),
req.Url, grpc.WithTransportCredentials(creds))
- if err != nil {
- s.connMutex.Unlock()
- return nil, fmt.Errorf("failed to dial %s: %v",
req.Url, err)
- }
- s.connCache[req.Url] = conn
- log.Printf("ForwardEcho: cached connection for %s",
req.Url)
- }
- s.connMutex.Unlock()
- } else {
- log.Printf("ForwardEcho: reusing cached connection for %s
(state: %v)", req.Url, conn.GetState())
+ es := &echoServer{
+ hostname: hostname,
+ serviceVersion: serviceVersion,
+ namespace: namespace,
+ instanceIP: instanceIP,
+ cluster: cluster,
+ servicePort: servicePort,
}
+ pb.RegisterEchoServiceServer(server, es)
+ pb.RegisterEchoTestServiceServer(server, es)
+ // Enable reflection API for grpcurl to discover services
+ reflection.Register(server)
- initialState := conn.GetState()
- log.Printf("ForwardEcho: initial connection state: %v", initialState)
-
- // CRITICAL: If connection is already READY, use it directly without
waiting
- // For cached connections, they should already be in READY state
- if initialState == connectivity.Ready {
- log.Printf("ForwardEcho: connection is already READY,
proceeding with RPC calls")
- } else {
- // Only wait for new connections or connections that are not
READY
- // For gRPC xDS proxyless, we need to wait for the client to
receive and process LDS/CDS/EDS
- // The connection state may transition: IDLE -> CONNECTING ->
READY (or TRANSIENT_FAILURE -> CONNECTING -> READY)
- log.Printf("ForwardEcho: waiting for xDS configuration to be
processed and connection to be ready (30 seconds)...")
-
- // Wait for state changes with multiple attempts
- maxWait := 30 * time.Second
-
- // Wait for state changes, allowing multiple state transitions
- // CRITICAL: Don't exit on TRANSIENT_FAILURE - it may recover
to READY
- stateChanged := false
- currentState := initialState
- startTime := time.Now()
- lastStateChangeTime := startTime
-
- for time.Since(startTime) < maxWait {
- if currentState == connectivity.Ready {
- log.Printf("ForwardEcho: connection is READY
after %v", time.Since(startTime))
- stateChanged = true
- break
- }
-
- // Only exit on Shutdown, not on TransientFailure (it
may recover)
- if currentState == connectivity.Shutdown {
- log.Printf("ForwardEcho: connection in %v state
after %v, cannot recover", currentState, time.Since(startTime))
- break
- }
-
- // Wait for state change with remaining timeout
- remaining := maxWait - time.Since(startTime)
- if remaining <= 0 {
- break
- }
-
- // Use shorter timeout for each WaitForStateChange call
to allow periodic checks
- waitTimeout := remaining
- if waitTimeout > 5*time.Second {
- waitTimeout = 5 * time.Second
- }
-
- stateCtx, stateCancel :=
context.WithTimeout(context.Background(), waitTimeout)
- if conn.WaitForStateChange(stateCtx, currentState) {
- newState := conn.GetState()
- elapsed := time.Since(startTime)
- log.Printf("ForwardEcho: connection state
changed from %v to %v after %v", currentState, newState, elapsed)
- stateChanged = true
- currentState = newState
- lastStateChangeTime = time.Now()
-
- // If READY, we're done
- if newState == connectivity.Ready {
- stateCancel()
- break
- }
-
- // If we're in TRANSIENT_FAILURE, continue
waiting - it may recover
- // gRPC xDS client will retry connection when
endpoints become available
- if newState == connectivity.TransientFailure {
- log.Printf("ForwardEcho: connection in
TRANSIENT_FAILURE, continuing to wait for recovery (remaining: %v)",
maxWait-elapsed)
- }
- } else {
- // Timeout waiting for state change - check if
we should continue
- elapsed := time.Since(startTime)
- if currentState ==
connectivity.TransientFailure {
- // If we've been in TRANSIENT_FAILURE
for a while, continue waiting
- // The connection may recover when
endpoints become available
- if time.Since(lastStateChangeTime) <
10*time.Second {
- log.Printf("ForwardEcho: still
in TRANSIENT_FAILURE after %v, continuing to wait (remaining: %v)", elapsed,
maxWait-elapsed)
- } else {
- log.Printf("ForwardEcho: no
state change after %v, current state: %v (remaining: %v)", elapsed,
currentState, maxWait-elapsed)
- }
- } else {
- log.Printf("ForwardEcho: no state
change after %v, current state: %v (remaining: %v)", elapsed, currentState,
maxWait-elapsed)
- }
- }
- stateCancel()
- }
-
- finalState := conn.GetState()
- log.Printf("ForwardEcho: final connection state: %v
(stateChanged=%v, waited=%v)", finalState, stateChanged, time.Since(startTime))
-
- // If connection is not READY, log a warning but proceed anyway
- // The first RPC call may trigger connection establishment
- if finalState != connectivity.Ready {
- log.Printf("ForwardEcho: WARNING - connection is not
READY (state=%v), but proceeding with RPC calls", finalState)
- }
+ lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
+ if err != nil {
+ log.Fatalf("Failed to listen: %v", err)
}
- // Create client and make RPC calls
- client := pb.NewEchoServiceClient(conn)
- output := make([]string, 0, count)
-
- log.Printf("ForwardEcho: sending %d requests...", count)
- for i := int32(0); i < count; i++ {
- echoReq := &pb.EchoRequest{
- Message: fmt.Sprintf("Request %d", i+1),
- }
-
- currentState := conn.GetState()
- log.Printf("ForwardEcho: sending request %d (connection state:
%v)...", i+1, currentState)
-
- // Use longer timeout for first request to allow connection
establishment
- // For subsequent requests, use shorter timeout but still allow
for retries
- timeout := 30 * time.Second
- if i > 0 {
- timeout = 20 * time.Second
- }
-
- reqCtx, reqCancel := context.WithTimeout(context.Background(),
timeout)
- reqStartTime := time.Now()
- resp, err := client.Echo(reqCtx, echoReq)
- duration := time.Since(reqStartTime)
- reqCancel()
-
- // Check connection state after RPC call
- stateAfterRPC := conn.GetState()
- log.Printf("ForwardEcho: request %d completed in %v, connection
state: %v (was %v)", i+1, duration, stateAfterRPC, currentState)
-
- if err != nil {
- log.Printf("ForwardEcho: request %d failed: %v", i+1,
err)
- output = append(output, fmt.Sprintf("[%d] Error: %v",
i, err))
-
- // If connection is in TRANSIENT_FAILURE, wait a bit
before next request
- // to allow gRPC client to retry and recover
- if stateAfterRPC == connectivity.TransientFailure && i
< count-1 {
- waitTime := 2 * time.Second
- log.Printf("ForwardEcho: connection in
TRANSIENT_FAILURE, waiting %v before next request...", waitTime)
- time.Sleep(waitTime)
-
- // Check if connection recovered
- newState := conn.GetState()
- if newState == connectivity.Ready {
- log.Printf("ForwardEcho: connection
recovered to READY after wait")
- } else {
- log.Printf("ForwardEcho: connection
state after wait: %v", newState)
- }
- }
- continue
- }
-
- if resp == nil {
- log.Printf("ForwardEcho: request %d failed: response is
nil", i+1)
- output = append(output, fmt.Sprintf("[%d] Error:
response is nil", i))
- continue
- }
-
- log.Printf("ForwardEcho: request %d succeeded: Hostname=%s
ServiceVersion=%s Namespace=%s IP=%s",
- i+1, resp.Hostname, resp.ServiceVersion,
resp.Namespace, resp.Ip)
-
- lineParts := []string{
- fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
- }
- if resp.ServiceVersion != "" {
- lineParts = append(lineParts,
fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
- }
- if resp.Namespace != "" {
- lineParts = append(lineParts,
fmt.Sprintf("Namespace=%s", resp.Namespace))
- }
- if resp.Ip != "" {
- lineParts = append(lineParts, fmt.Sprintf("IP=%s",
resp.Ip))
- }
- if resp.Cluster != "" {
- lineParts = append(lineParts, fmt.Sprintf("Cluster=%s",
resp.Cluster))
- }
- if resp.ServicePort > 0 {
- lineParts = append(lineParts,
fmt.Sprintf("ServicePort=%d", resp.ServicePort))
- }
-
- output = append(output, strings.Join(lineParts, " "))
-
- // Small delay between successful requests to avoid
overwhelming the server
- if i < count-1 {
- time.Sleep(100 * time.Millisecond)
+ log.Printf("Starting gRPC proxyless server on port %d (hostname: %s)",
*port, hostname)
+
+ go func() {
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+ <-sigChan
+ log.Println("Shutting down server...")
+ server.GracefulStop()
+ }()
+
+ // Serve the gRPC server
+ // Note: server.Serve returns when the listener is closed, which is
normal during shutdown
+ // Connection reset errors are handled by the gRPC library and logged
separately
+ if err := server.Serve(lis); err != nil {
+ // Only log as fatal if it's not a normal shutdown (listener
closed)
+ if !strings.Contains(err.Error(), "use of closed network
connection") {
+ log.Fatalf("Failed to serve: %v", err)
}
+ log.Printf("Server stopped: %v", err)
}
-
- log.Printf("ForwardEcho: completed %d requests", count)
-
- return &pb.ForwardEchoResponse{
- Output: output,
- }, nil
}