This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new e51536d9 fix krt and xds api (#822)
e51536d9 is described below
commit e51536d9ec888ed58e60b7b9ddac0dd757f7763e
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Nov 18 21:44:35 2025 +0800
fix krt and xds api (#822)
* fix krt and xds api
* fix ci
* fix ci
---
dubbod/planet/pkg/bootstrap/server.go | 89 ++++++-
dubbod/planet/pkg/bootstrap/validation.go | 2 +
dubbod/planet/pkg/config/aggregate/config.go | 11 +-
dubbod/planet/pkg/config/kube/crdclient/client.go | 161 ++++++++++--
dubbod/planet/pkg/config/kube/crdclient/types.go | 275 +++++++++++++++++++--
dubbod/planet/pkg/model/push_context.go | 196 ++++++++++++++-
dubbod/planet/pkg/model/serviceroute.go | 34 ++-
dubbod/planet/pkg/model/subsetrule.go | 17 ++
dubbod/planet/pkg/networking/grpcgen/cds.go | 77 +++++-
dubbod/planet/pkg/networking/grpcgen/rds.go | 147 +++++++++--
dubbod/planet/pkg/xds/ads.go | 10 +-
dubbod/planet/pkg/xds/delta.go | 23 +-
.../planet/pkg/xds/endpoints/endpoint_builder.go | 14 +-
dubbod/planet/pkg/xds/xdsgen.go | 50 ++++
manifests/charts/base/files/crd-all.yaml | 4 +-
.../dubbo-discovery/templates/clusterrole.yaml | 3 +
.../templates/validatingwebhookconfiguration.yaml | 37 ++-
pkg/config/schema/collections/collections.agent.go | 6 +-
pkg/config/schema/collections/collections.go | 6 +-
pkg/config/schema/gvk/resources.go | 2 +-
pkg/config/schema/kind/resources.go | 2 +-
pkg/config/schema/kubeclient/resources.go | 67 ++++-
pkg/kube/kclient/client.go | 138 ++++++++---
pkg/kube/kclient/delayed.go | 46 +++-
pkg/kube/krt/informer.go | 40 ++-
pkg/kube/namespace/filter.go | 11 +-
pkg/xds/server.go | 33 +++
samples/grpc-app/grpc-app.yaml | 76 +++++-
tests/grpc-app/consumer/main.go | 62 ++++-
tests/grpc-app/producer/main.go | 25 +-
tests/grpc-app/proto/echo.proto | 5 +
31 files changed, 1499 insertions(+), 170 deletions(-)
diff --git a/dubbod/planet/pkg/bootstrap/server.go
b/dubbod/planet/pkg/bootstrap/server.go
index e2976797..aa363d4f 100644
--- a/dubbod/planet/pkg/bootstrap/server.go
+++ b/dubbod/planet/pkg/bootstrap/server.go
@@ -47,6 +47,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/ctrlz"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/h2c"
@@ -252,8 +253,7 @@ func NewServer(args *PlanetArgs, initFuncs
...func(*Server)) (*Server, error) {
}
}
- s.initRegistryEventHandlers()
-
+ // Note: initRegistryEventHandlers is called in Start() after config
controller starts
s.initDiscoveryService()
s.startCA(caOpts)
@@ -284,6 +284,10 @@ func (s *Server) Start(stop <-chan struct{}) error {
s.XDSServer.CachesSynced()
+ // Register event handlers after config controller has started and
synced
+ // This ensures that config changes are properly detected and handled
+ s.initRegistryEventHandlers()
+
if s.secureGrpcAddress != "" {
grpcListener, err := net.Listen("tcp", s.secureGrpcAddress)
if err != nil {
@@ -424,13 +428,84 @@ func (s *Server) initSecureDiscoveryService(args
*PlanetArgs, trustDomain string
func (s *Server) initRegistryEventHandlers() {
log.Info("initializing registry event handlers")
- if s.configController != nil {
- configHandler := func(prev config.Config, curr config.Config,
event model.Event) {}
- schemas := collections.Planet.All()
- for _, schema := range schemas {
-
s.configController.RegisterEventHandler(schema.GroupVersionKind(),
configHandler)
+ if s.configController == nil {
+ log.Warnf("initRegistryEventHandlers: configController is nil,
cannot register event handlers")
+ return
+ }
+
+ log.Infof("initRegistryEventHandlers: configController is available,
registering event handlers")
+
+ configHandler := func(prev config.Config, curr config.Config, event
model.Event) {
+ // Log ALL events at INFO level to ensure visibility
+ log.Infof("configHandler: received event %s for config %v
(prev.Name=%s, curr.Name=%s, prev.Namespace=%s, curr.Namespace=%s)",
+ event, curr.GroupVersionKind, prev.Name, curr.Name,
prev.Namespace, curr.Namespace)
+
+ // Handle delete events - use prev config if curr is empty
+ cfg := curr
+ if event == model.EventDelete && curr.Name == "" {
+ cfg = prev
+ }
+
+ // Build ConfigKey for the changed config
+ // Find the schema to get the kind.Kind
+ schema, found :=
collections.Planet.FindByGroupVersionKind(cfg.GroupVersionKind)
+ if !found {
+ log.Warnf("configHandler: schema not found for %v,
skipping", cfg.GroupVersionKind)
+ return
+ }
+
+ // Map GVK to kind.Kind using schema identifier
+ // This matches Istio's approach of using gvk.MustToKind, but
we use schema.Identifier() instead
+ schemaID := schema.Identifier()
+ log.Infof("configHandler: processing config change, schema
identifier=%s, GVK=%v, name=%s/%s, event=%s",
+ schemaID, cfg.GroupVersionKind, cfg.Namespace,
cfg.Name, event)
+
+ var configKind kind.Kind
+ switch schemaID {
+ case "SubsetRule":
+ configKind = kind.SubsetRule
+ case "serviceRoute", "ServiceRoute":
+ configKind = kind.ServiceRoute
+ case "PeerAuthentication":
+ configKind = kind.PeerAuthentication
+ default:
+ log.Debugf("configHandler: unknown schema identifier %s
for %v, skipping", schemaID, cfg.GroupVersionKind)
+ return
+ }
+
+ configKey := model.ConfigKey{
+ Kind: configKind,
+ Name: cfg.Name,
+ Namespace: cfg.Namespace,
}
+
+ // Log the config change
+ log.Infof("configHandler: %s event for %s/%s/%s", event,
configKey.Kind, configKey.Namespace, configKey.Name)
+
+ // CRITICAL: For SubsetRule and ServiceRoute changes, we need
Full push to ensure
+ // PushContext is re-initialized and configuration is reloaded
+ // This is because these configs affect CDS/RDS generation and
need complete context refresh
+ needsFullPush := configKind == kind.SubsetRule || configKind ==
kind.ServiceRoute
+
+ // Trigger ConfigUpdate to push changes to all connected proxies
+ s.XDSServer.ConfigUpdate(&model.PushRequest{
+ ConfigsUpdated: sets.New(configKey),
+ Reason:
model.NewReasonStats(model.DependentResource),
+ Full: needsFullPush, // Full push for
SubsetRule/ServiceRoute to reload PushContext
+ })
}
+ schemas := collections.Planet.All()
+ log.Infof("initRegistryEventHandlers: found %d schemas to register",
len(schemas))
+ registeredCount := 0
+ for _, schema := range schemas {
+ gvk := schema.GroupVersionKind()
+ schemaID := schema.Identifier()
+ log.Infof("initRegistryEventHandlers: registering event handler
for %s (GVK: %v)", schemaID, gvk)
+ s.configController.RegisterEventHandler(gvk, configHandler)
+ registeredCount++
+ log.Infof("initRegistryEventHandlers: successfully registered
event handler for %s (GVK: %v)", schemaID, gvk)
+ }
+ log.Infof("initRegistryEventHandlers: successfully registered event
handlers for %d schemas", registeredCount)
}
func (s *Server) addReadinessProbe(name string, fn readinessProbe) {
diff --git a/dubbod/planet/pkg/bootstrap/validation.go
b/dubbod/planet/pkg/bootstrap/validation.go
index 6c53b01f..05a7df9d 100644
--- a/dubbod/planet/pkg/bootstrap/validation.go
+++ b/dubbod/planet/pkg/bootstrap/validation.go
@@ -19,6 +19,7 @@ package bootstrap
import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
"github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/server"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/validation/controller"
@@ -30,6 +31,7 @@ func (s *Server) initConfigValidation(args *PlanetArgs) error
{
}
log.Info("initializing config validator")
params := server.Options{
+ Schemas: collections.Planet,
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
Mux: s.httpsMux,
}
diff --git a/dubbod/planet/pkg/config/aggregate/config.go
b/dubbod/planet/pkg/config/aggregate/config.go
index cc64232a..f90702a8 100644
--- a/dubbod/planet/pkg/config/aggregate/config.go
+++ b/dubbod/planet/pkg/config/aggregate/config.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
)
@@ -158,11 +159,19 @@ func makeStore(stores []model.ConfigStore, writer
model.ConfigStore) (model.Conf
}
func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind,
handler model.EventHandler) {
- for _, cache := range cr.caches {
+ log := log.RegisterScope("aggregate", "aggregate config controller")
+ log.Infof("RegisterEventHandler: registering handler for %v across %d
caches", kind, len(cr.caches))
+ registeredCount := 0
+ for i, cache := range cr.caches {
if _, exists := cache.Schemas().FindByGroupVersionKind(kind);
exists {
+ log.Infof("RegisterEventHandler: registering handler
for %v on cache[%d] (type=%T)", kind, i, cache)
cache.RegisterEventHandler(kind, handler)
+ registeredCount++
+ } else {
+ log.Debugf("RegisterEventHandler: cache[%d] does not
support %v, skipping", i, kind)
}
}
+ log.Infof("RegisterEventHandler: successfully registered handler for %v
on %d caches", kind, registeredCount)
}
func (cr *storeCache) Run(stop <-chan struct{}) {
diff --git a/dubbod/planet/pkg/config/kube/crdclient/client.go
b/dubbod/planet/pkg/config/kube/crdclient/client.go
index c251efab..e4cab973 100644
--- a/dubbod/planet/pkg/config/kube/crdclient/client.go
+++ b/dubbod/planet/pkg/config/kube/crdclient/client.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
"github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
@@ -152,19 +153,20 @@ func (cl *Client) allKinds()
map[config.GroupVersionKind]nsStore {
}
func (cl *Client) addCRD(name string, opts krt.OptionsBuilder) {
- cl.logger.Debugf("adding CRD %q", name)
+ cl.logger.Infof("addCRD: adding CRD %q", name)
s, f := cl.schemasByCRDName[name]
if !f {
- cl.logger.Debugf("added resource that we are not watching: %v",
name)
+ cl.logger.Warnf("addCRD: added resource that we are not
watching: %v", name)
return
}
resourceGVK := s.GroupVersionKind()
gvr := s.GroupVersionResource()
+ cl.logger.Infof("addCRD: CRD %q maps to GVK %v, GVR %v", name,
resourceGVK, gvr)
cl.kindsMu.Lock()
defer cl.kindsMu.Unlock()
if _, f := cl.kinds[resourceGVK]; f {
- cl.logger.Debugf("added resource that already exists: %v",
resourceGVK)
+ cl.logger.Warnf("addCRD: added resource that already exists:
%v", resourceGVK)
return
}
translateFunc, f := translationMap[resourceGVK]
@@ -189,6 +191,9 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
var namespaceFilter kubetypes.DynamicObjectFilter
if !s.IsClusterScoped() {
namespaceFilter = cl.client.ObjectFilter()
+ cl.logger.Infof("addCRD: using namespace filter for %v (not
cluster-scoped)", resourceGVK)
+ } else {
+ cl.logger.Infof("addCRD: no namespace filter for %v
(cluster-scoped)", resourceGVK)
}
filter := kubetypes.Filter{
@@ -196,15 +201,23 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
ObjectTransform: transform,
FieldSelector: fieldSelector,
}
+ cl.logger.Infof("addCRD: created filter for %v (namespaceFilter=%v,
extraFilter=%v, fieldSelector=%v)", resourceGVK, namespaceFilter != nil,
extraFilter != nil, fieldSelector)
var kc kclient.Untyped
if s.IsBuiltin() {
kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
} else {
+ // For SubsetRule and ServiceRoute, we use Dynamic client which
returns unstructured objects
+ // So we need to use DynamicInformer type to ensure the
informer expects unstructured objects
+ informerType := kubetypes.StandardInformer
+ if resourceGVK == gvk.SubsetRule || resourceGVK ==
gvk.ServiceRoute || resourceGVK == gvk.PeerAuthentication {
+ informerType = kubetypes.DynamicInformer
+ cl.logger.Infof("addCRD: using DynamicInformer for %v
(uses Dynamic client)", resourceGVK)
+ }
kc = kclient.NewDelayedInformer[controllers.Object](
cl.client,
gvr,
- kubetypes.StandardInformer,
+ informerType,
filter,
)
}
@@ -213,15 +226,58 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
collection := krt.MapCollection(wrappedClient, func(obj
controllers.Object) config.Config {
cfg := translateFunc(obj)
cfg.Domain = cl.domainSuffix
+ // Only log at Debug level to avoid spam, but keep it available
for diagnosis
+ cl.logger.Debugf("addCRD: MapCollection translating object
%s/%s to config for %v", obj.GetNamespace(), obj.GetName(), resourceGVK)
return cfg
}, opts.WithName("collection/"+resourceGVK.Kind)...)
index := krt.NewNamespaceIndex(collection)
+ // Register a debug handler to track all events from the wrappedClient
(before MapCollection)
+ // This helps diagnose if events are being filtered before reaching the
collection
+ wrappedClientDebugHandler := wrappedClient.RegisterBatch(func(o
[]krt.Event[controllers.Object]) {
+ if len(o) > 0 {
+ cl.logger.Infof("addCRD: wrappedClient event detected
for %v: %d events", resourceGVK, len(o))
+ for i, event := range o {
+ var nameStr, nsStr string
+ if event.New != nil {
+ obj := *event.New
+ nameStr = obj.GetName()
+ nsStr = obj.GetNamespace()
+ } else if event.Old != nil {
+ obj := *event.Old
+ nameStr = obj.GetName()
+ nsStr = obj.GetNamespace()
+ }
+ cl.logger.Infof("addCRD: wrappedClient
event[%d] %s for %v (name=%s/%s)",
+ i, event.Event, resourceGVK, nsStr,
nameStr)
+ }
+ }
+ }, false)
+ // Register a debug handler to track all events from the collection
+ // This helps diagnose why new config changes might not trigger events
+ // Use false to match Istio's implementation - only process future
events, not initial sync
+ debugHandler := collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
+ if len(o) > 0 {
+ cl.logger.Infof("addCRD: collection event detected for
%v: %d events", resourceGVK, len(o))
+ for i, event := range o {
+ var nameStr, nsStr string
+ if event.New != nil {
+ nameStr = event.New.Name
+ nsStr = event.New.Namespace
+ } else if event.Old != nil {
+ nameStr = event.Old.Name
+ nsStr = event.Old.Namespace
+ }
+ cl.logger.Infof("addCRD: collection event[%d]
%s for %v (name=%s/%s)",
+ i, event.Event, resourceGVK, nsStr,
nameStr)
+ }
+ }
+ }, false)
cl.kinds[resourceGVK] = nsStore{
collection: collection,
index: index,
handlers: []krt.HandlerRegistration{
- collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
- }, false),
+ wrappedClientDebugHandler,
+ debugHandler,
},
}
}
@@ -238,23 +294,59 @@ func (cl *Client) Schemas() collection.Schemas {
}
func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler
model.EventHandler) {
- if c, ok := cl.kind(kind); ok {
- c.handlers = append(c.handlers,
c.collection.RegisterBatch(func(o []krt.Event[config.Config]) {
- for _, event := range o {
- switch event.Event {
- case controllers.EventAdd:
+ cl.kindsMu.Lock()
+ defer cl.kindsMu.Unlock()
+
+ c, ok := cl.kinds[kind]
+ if !ok {
+ cl.logger.Warnf("unknown type: %s", kind)
+ return
+ }
+
+ cl.logger.Infof("RegisterEventHandler: registering handler for %v",
kind)
+ // Match Istio's implementation: RegisterBatch returns a
HandlerRegistration that is already
+ // registered with the collection, so we just need to append it to
handlers to keep a reference
+ // The handler will be called by the collection when events occur,
regardless of whether we
+ // update cl.kinds[kind] or not. However, we update it to keep the
handlers slice in sync.
+ handlerReg := c.collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
+ cl.logger.Infof("RegisterEventHandler: batch handler triggered
for %v with %d events", kind, len(o))
+ for i, event := range o {
+ var nameStr, nsStr string
+ if event.New != nil {
+ nameStr = event.New.Name
+ nsStr = event.New.Namespace
+ } else if event.Old != nil {
+ nameStr = event.Old.Name
+ nsStr = event.Old.Namespace
+ }
+ cl.logger.Infof("RegisterEventHandler: processing
event[%d] %s for %v (name=%s/%s)",
+ i, event.Event, kind, nsStr, nameStr)
+ switch event.Event {
+ case controllers.EventAdd:
+ if event.New != nil {
handler(config.Config{}, *event.New,
model.Event(event.Event))
- case controllers.EventUpdate:
+ } else {
+ cl.logger.Warnf("RegisterEventHandler:
EventAdd but event.New is nil, skipping")
+ }
+ case controllers.EventUpdate:
+ if event.Old != nil && event.New != nil {
handler(*event.Old, *event.New,
model.Event(event.Event))
- case controllers.EventDelete:
+ } else {
+ cl.logger.Warnf("RegisterEventHandler:
EventUpdate but event.Old or event.New is nil, skipping")
+ }
+ case controllers.EventDelete:
+ if event.Old != nil {
handler(config.Config{}, *event.Old,
model.Event(event.Event))
+ } else {
+ cl.logger.Warnf("RegisterEventHandler:
EventDelete but event.Old is nil, skipping")
}
}
- }, false))
- return
- }
-
- cl.logger.Warnf("unknown type: %s", kind)
+ }
+ }, false)
+ // Update handlers slice to keep reference (though not strictly
necessary for functionality)
+ c.handlers = append(c.handlers, handlerReg)
+ cl.kinds[kind] = c
+ cl.logger.Infof("RegisterEventHandler: successfully registered handler
for %v", kind)
}
func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string)
*config.Config {
@@ -333,14 +425,43 @@ func (cl *Client) Delete(typ config.GroupVersionKind,
name, namespace string, re
func (cl *Client) List(kind config.GroupVersionKind, namespace string)
[]config.Config {
h, f := cl.kind(kind)
if !f {
+ cl.logger.Warnf("List: unknown kind %v", kind)
return nil
}
+ // Check if collection is synced
+ if !h.collection.HasSynced() {
+ cl.logger.Warnf("List: collection for %v is not synced yet",
kind)
+ }
+
+ var configs []config.Config
if namespace == metav1.NamespaceAll {
- return h.collection.List()
+ // Get all configs from collection
+ configs = h.collection.List()
+ cl.logger.Infof("List: found %d configs for %v (namespace=all,
synced=%v)",
+ len(configs), kind, h.collection.HasSynced())
+ if len(configs) > 0 {
+ for i, cfg := range configs {
+ cl.logger.Infof("List: config[%d] %s/%s for
%v", i, cfg.Namespace, cfg.Name, kind)
+ }
+ } else {
+ cl.logger.Warnf("List: collection returned 0 configs
for %v (synced=%v), this may indicate informer is not watching correctly or
resources are being filtered", kind, h.collection.HasSynced())
+ }
+ // Log collection type for diagnosis
+ cl.logger.Infof("List: collection type is %T, HasSynced=%v",
h.collection, h.collection.HasSynced())
+ } else {
+ configs = h.index.Lookup(namespace)
+ cl.logger.Infof("List: found %d configs for %v in namespace %s
(synced=%v)", len(configs), kind, namespace, h.collection.HasSynced())
+ if len(configs) > 0 {
+ for i, cfg := range configs {
+ cl.logger.Infof("List: config[%d] %s/%s for
%v", i, cfg.Namespace, cfg.Name, kind)
+ }
+ } else {
+ cl.logger.Warnf("List: found 0 configs for %v in
namespace %s (synced=%v), checking if resources exist in cluster", kind,
namespace, h.collection.HasSynced())
+ }
}
- return h.index.Lookup(namespace)
+ return configs
}
func getObjectMetadata(config config.Config) metav1.ObjectMeta {
diff --git a/dubbod/planet/pkg/config/kube/crdclient/types.go
b/dubbod/planet/pkg/config/kube/crdclient/types.go
index 9e70c241..c5d62fa9 100644
--- a/dubbod/planet/pkg/config/kube/crdclient/types.go
+++ b/dubbod/planet/pkg/config/kube/crdclient/types.go
@@ -37,7 +37,9 @@ import (
k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
k8sioapiextensionsapiserverpkgapisapiextensionsv1
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)
@@ -53,13 +55,30 @@ func assignSpec[T any](dst *T, src *T) {
func create(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta)
(metav1.Object, error) {
switch cfg.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, not
networking.istio.io
+ // Use Dynamic client to access it, but reuse Istio's
DestinationRule spec structure
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.DestinationRule{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Create(context.TODO(),
obj, metav1.CreateOptions{})
+ // Convert to unstructured for Dynamic client
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
DestinationRule to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "SubsetRule",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(cfg.Namespace).Create(context.TODO(), u,
metav1.CreateOptions{})
case gvk.PeerAuthentication:
spec := cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
clonedSpec := protomarshal.Clone(spec)
@@ -67,15 +86,46 @@ func create(c kube.Client, cfg config.Config, objMeta
metav1.ObjectMeta) (metav1
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Create(context.TODO(),
obj, metav1.CreateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
PeerAuthentication to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Kind: "PeerAuthentication",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(cfg.Namespace).Create(context.TODO(), u,
metav1.CreateOptions{})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, not
networking.istio.io
+ // Use Dynamic client to access it, but reuse Istio's
VirtualService spec structure
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.VirtualService{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Create(context.TODO(),
obj, metav1.CreateOptions{})
+ // Convert to unstructured for Dynamic client
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
VirtualService to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "ServiceRoute",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(cfg.Namespace).Create(context.TODO(), u,
metav1.CreateOptions{})
default:
return nil, fmt.Errorf("unsupported type: %v",
cfg.GroupVersionKind)
}
@@ -84,13 +134,28 @@ func create(c kube.Client, cfg config.Config, objMeta
metav1.ObjectMeta) (metav1
func update(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta)
(metav1.Object, error) {
switch cfg.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, use
Dynamic client
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.DestinationRule{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Update(context.TODO(),
obj, metav1.UpdateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
DestinationRule to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "SubsetRule",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(cfg.Namespace).Update(context.TODO(), u,
metav1.UpdateOptions{})
case gvk.PeerAuthentication:
spec := cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
clonedSpec := protomarshal.Clone(spec)
@@ -98,15 +163,44 @@ func update(c kube.Client, cfg config.Config, objMeta
metav1.ObjectMeta) (metav1
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Update(context.TODO(),
obj, metav1.UpdateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
PeerAuthentication to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Kind: "PeerAuthentication",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(cfg.Namespace).Update(context.TODO(), u,
metav1.UpdateOptions{})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, use
Dynamic client
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.VirtualService{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Update(context.TODO(),
obj, metav1.UpdateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
VirtualService to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "ServiceRoute",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(cfg.Namespace).Update(context.TODO(), u,
metav1.UpdateOptions{})
default:
return nil, fmt.Errorf("unsupported type: %v",
cfg.GroupVersionKind)
}
@@ -115,13 +209,28 @@ func update(c kube.Client, cfg config.Config, objMeta
metav1.ObjectMeta) (metav1
func updateStatus(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta)
(metav1.Object, error) {
switch cfg.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, use
Dynamic client
status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
clonedStatus := protomarshal.Clone(status)
obj := &apiistioioapinetworkingv1.DestinationRule{
ObjectMeta: objMeta,
}
assignSpec(&obj.Status, clonedStatus)
- return
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).UpdateStatus(context.TODO(),
obj, metav1.UpdateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
DestinationRule to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "SubsetRule",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u,
metav1.UpdateOptions{})
case gvk.PeerAuthentication:
status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
clonedStatus := protomarshal.Clone(status)
@@ -129,15 +238,44 @@ func updateStatus(c kube.Client, cfg config.Config,
objMeta metav1.ObjectMeta) (
ObjectMeta: objMeta,
}
assignSpec(&obj.Status, clonedStatus)
- return
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).UpdateStatus(context.TODO(),
obj, metav1.UpdateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
PeerAuthentication status to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Kind: "PeerAuthentication",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u,
metav1.UpdateOptions{})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, use
Dynamic client
status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
clonedStatus := protomarshal.Clone(status)
obj := &apiistioioapinetworkingv1.VirtualService{
ObjectMeta: objMeta,
}
assignSpec(&obj.Status, clonedStatus)
- return
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).UpdateStatus(context.TODO(),
obj, metav1.UpdateOptions{})
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert
VirtualService to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "ServiceRoute",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u,
metav1.UpdateOptions{})
default:
return nil, fmt.Errorf("unsupported type: %v",
cfg.GroupVersionKind)
}
@@ -149,6 +287,7 @@ func patch(c kube.Client, orig config.Config, origMeta
metav1.ObjectMeta, mod co
}
switch orig.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, use
Dynamic client
origSpec :=
orig.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
modSpec :=
mod.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
clonedOrigSpec := protomarshal.Clone(origSpec)
@@ -165,8 +304,11 @@ func patch(c kube.Client, orig config.Config, origMeta
metav1.ObjectMeta, mod co
if err != nil {
return nil, err
}
- return
c.Dubbo().NetworkingV1().DestinationRules(orig.Namespace).
- Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "planet-discovery"})
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name,
typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
case gvk.PeerAuthentication:
origSpec :=
orig.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
modSpec :=
mod.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
@@ -184,9 +326,13 @@ func patch(c kube.Client, orig config.Config, origMeta
metav1.ObjectMeta, mod co
if err != nil {
return nil, err
}
- return
c.Dubbo().SecurityV1().PeerAuthentications(orig.Namespace).
- Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "planet-discovery"})
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name,
typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, use
Dynamic client
origSpec :=
orig.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
modSpec :=
mod.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
clonedOrigSpec := protomarshal.Clone(origSpec)
@@ -203,8 +349,11 @@ func patch(c kube.Client, orig config.Config, origMeta
metav1.ObjectMeta, mod co
if err != nil {
return nil, err
}
- return c.Dubbo().NetworkingV1().VirtualServices(orig.Namespace).
- Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "planet-discovery"})
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name,
typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
default:
return nil, fmt.Errorf("unsupported type: %v",
orig.GroupVersionKind)
}
@@ -217,11 +366,25 @@ func delete(c kube.Client, typ config.GroupVersionKind,
name, namespace string,
}
switch typ {
case gvk.SubsetRule:
- return
c.Dubbo().NetworkingV1().DestinationRules(namespace).Delete(context.TODO(),
name, deleteOptions)
+ // SubsetRule uses networking.dubbo.apache.org API group, use
Dynamic client
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(namespace).Delete(context.TODO(), name,
deleteOptions)
case gvk.PeerAuthentication:
- return
c.Dubbo().SecurityV1().PeerAuthentications(namespace).Delete(context.TODO(),
name, deleteOptions)
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(namespace).Delete(context.TODO(), name,
deleteOptions)
case gvk.ServiceRoute:
- return
c.Dubbo().NetworkingV1().VirtualServices(namespace).Delete(context.TODO(),
name, deleteOptions)
+ // ServiceRoute uses networking.dubbo.apache.org API group, use
Dynamic client
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(namespace).Delete(context.TODO(), name,
deleteOptions)
default:
return fmt.Errorf("unsupported type: %v", typ)
}
@@ -301,7 +464,32 @@ var translationMap = map[config.GroupVersionKind]func(r
runtime.Object) config.C
}
},
gvk.SubsetRule: func(r runtime.Object) config.Config {
- obj := r.(*apiistioioapinetworkingv1.DestinationRule)
+ var obj *apiistioioapinetworkingv1.DestinationRule
+ // Handle unstructured objects from Dynamic client
+ // First try to convert from unstructured, as Dynamic client
returns unstructured objects
+ // Note: r may be controllers.Object which embeds
runtime.Object, so we need to check the concrete type
+ switch v := r.(type) {
+ case *unstructured.Unstructured:
+ obj = &apiistioioapinetworkingv1.DestinationRule{}
+ if err :=
runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err !=
nil {
+ panic(fmt.Sprintf("failed to convert
unstructured to DestinationRule: %v", err))
+ }
+ case *apiistioioapinetworkingv1.DestinationRule:
+ // Handle typed objects from Istio client
+ obj = v
+ default:
+ // Fallback: try to convert any runtime.Object to
unstructured first, then to DestinationRule
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+ if err == nil {
+ u := &unstructured.Unstructured{Object: uObj}
+ obj =
&apiistioioapinetworkingv1.DestinationRule{}
+ if err :=
runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err !=
nil {
+ panic(fmt.Sprintf("failed to convert
object %T to DestinationRule: %v", r, err))
+ }
+ } else {
+ panic(fmt.Sprintf("unexpected object type for
SubsetRule: %T, expected *unstructured.Unstructured or
*apiistioioapinetworkingv1.DestinationRule, conversion error: %v", r, err))
+ }
+ }
return config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.SubsetRule,
@@ -338,7 +526,27 @@ var translationMap = map[config.GroupVersionKind]func(r
runtime.Object) config.C
}
},
gvk.PeerAuthentication: func(r runtime.Object) config.Config {
- obj := r.(*apiistioioapisecurityv1.PeerAuthentication)
+ var obj *apiistioioapisecurityv1.PeerAuthentication
+ switch v := r.(type) {
+ case *unstructured.Unstructured:
+ obj = &apiistioioapisecurityv1.PeerAuthentication{}
+ if err :=
runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err !=
nil {
+ panic(fmt.Sprintf("failed to convert
unstructured to PeerAuthentication: %v", err))
+ }
+ case *apiistioioapisecurityv1.PeerAuthentication:
+ obj = v
+ default:
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+ if err == nil {
+ u := &unstructured.Unstructured{Object: uObj}
+ obj =
&apiistioioapisecurityv1.PeerAuthentication{}
+ if err :=
runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err !=
nil {
+ panic(fmt.Sprintf("failed to convert
object %T to PeerAuthentication: %v", r, err))
+ }
+ } else {
+ panic(fmt.Sprintf("unexpected object type for
PeerAuthentication: %T, conversion error: %v", r, err))
+ }
+ }
return config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.PeerAuthentication,
@@ -466,7 +674,32 @@ var translationMap = map[config.GroupVersionKind]func(r
runtime.Object) config.C
}
},
gvk.ServiceRoute: func(r runtime.Object) config.Config {
- obj := r.(*apiistioioapinetworkingv1.VirtualService)
+ var obj *apiistioioapinetworkingv1.VirtualService
+ // Handle unstructured objects from Dynamic client
+ // First try to convert from unstructured, as Dynamic client
returns unstructured objects
+ // Note: r may be controllers.Object which embeds
runtime.Object, so we need to check the concrete type
+ switch v := r.(type) {
+ case *unstructured.Unstructured:
+ obj = &apiistioioapinetworkingv1.VirtualService{}
+ if err :=
runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err !=
nil {
+ panic(fmt.Sprintf("failed to convert
unstructured to VirtualService: %v", err))
+ }
+ case *apiistioioapinetworkingv1.VirtualService:
+ // Handle typed objects from Istio client
+ obj = v
+ default:
+ // Fallback: try to convert any runtime.Object to
unstructured first, then to VirtualService
+ uObj, err :=
runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+ if err == nil {
+ u := &unstructured.Unstructured{Object: uObj}
+ obj =
&apiistioioapinetworkingv1.VirtualService{}
+ if err :=
runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err !=
nil {
+ panic(fmt.Sprintf("failed to convert
object %T to VirtualService: %v", r, err))
+ }
+ } else {
+ panic(fmt.Sprintf("unexpected object type for
ServiceRoute: %T, expected *unstructured.Unstructured or
*apiistioioapinetworkingv1.VirtualService, conversion error: %v", r, err))
+ }
+ }
return config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceRoute,
diff --git a/dubbod/planet/pkg/model/push_context.go
b/dubbod/planet/pkg/model/push_context.go
index fd8f1d8c..7990cd3b 100644
--- a/dubbod/planet/pkg/model/push_context.go
+++ b/dubbod/planet/pkg/model/push_context.go
@@ -19,12 +19,14 @@ package model
import (
"cmp"
- "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
- networking "istio.io/api/networking/v1alpha3"
"sort"
"sync"
"time"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ networking "istio.io/api/networking/v1alpha3"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/serviceregistry/provider"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config"
@@ -139,6 +141,9 @@ type serviceRouteIndex struct {
// Map of VS hostname -> referenced hostnames
referencedDestinations map[string]sets.String
+
+ // hostToRoutes keeps the resolved VirtualServices keyed by host
+ hostToRoutes map[host.Name][]config.Config
}
type subsetRuleIndex struct {
@@ -175,6 +180,7 @@ func newServiceRouteIndex() serviceRouteIndex {
out := serviceRouteIndex{
delegates: map[ConfigKey][]ConfigKey{},
referencedDestinations: map[string]sets.String{},
+ hostToRoutes: map[host.Name][]config.Config{},
}
return out
}
@@ -505,7 +511,10 @@ func (ps *PushContext) initServiceRegistry(env
*Environment, configsUpdate sets.
}
func (ps *PushContext) createNewContext(env *Environment) {
+ log.Infof("createNewContext: creating new PushContext (full
initialization)")
ps.initServiceRegistry(env, nil)
+ ps.initServiceRoutes(env)
+ ps.initSubsetRules(env)
}
func (ps *PushContext) updateContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
@@ -518,14 +527,45 @@ func (ps *PushContext) updateContext(env *Environment,
oldPushContext *PushConte
// Check if serviceRoutes have changed base on:
// 1. ServiceRoute updates in ConfigsUpdated
- serviceRoutesChanged := pushReq != nil &&
(HasConfigsOfKind(pushReq.ConfigsUpdated, kind.ServiceRoute) ||
+ // 2. Full push (Full: true) - always re-initialize on full push
+ serviceRoutesChanged := pushReq != nil && (pushReq.Full ||
HasConfigsOfKind(pushReq.ConfigsUpdated, kind.ServiceRoute) ||
len(pushReq.AddressesUpdated) > 0)
- // Check if serviceRoutes have changed base on:
+ if pushReq != nil {
+ serviceRouteCount := 0
+ for cfg := range pushReq.ConfigsUpdated {
+ if cfg.Kind == kind.ServiceRoute {
+ serviceRouteCount++
+ }
+ }
+ if serviceRouteCount > 0 {
+ log.Infof("updateContext: detected %d ServiceRoute
config changes", serviceRouteCount)
+ }
+ }
+
+ // Check if subsetRules have changed base on:
// 1. SubsetRule updates in ConfigsUpdated
- subsetRulesChanged := pushReq != nil &&
(HasConfigsOfKind(pushReq.ConfigsUpdated, kind.SubsetRule) ||
+ // 2. Full push (Full: true) - always re-initialize on full push
+ subsetRulesChanged := pushReq != nil && (pushReq.Full ||
HasConfigsOfKind(pushReq.ConfigsUpdated, kind.SubsetRule) ||
len(pushReq.AddressesUpdated) > 0)
+ if pushReq != nil {
+ subsetRuleCount := 0
+ for cfg := range pushReq.ConfigsUpdated {
+ if cfg.Kind == kind.SubsetRule {
+ subsetRuleCount++
+ }
+ }
+ if subsetRuleCount > 0 {
+ log.Infof("updateContext: detected %d SubsetRule config
changes", subsetRuleCount)
+ }
+ if pushReq.Full {
+ log.Infof("updateContext: Full push requested, will
re-initialize SubsetRule and ServiceRoute indexes")
+ }
+ log.Debugf("updateContext: subsetRulesChanged=%v,
serviceRoutesChanged=%v, pushReq.ConfigsUpdated size=%d, Full=%v",
+ subsetRulesChanged, serviceRoutesChanged,
len(pushReq.ConfigsUpdated), pushReq != nil && pushReq.Full)
+ }
+
// Also check if the actual number of services has changed
// This handles cases where Kubernetes Services are added/removed
without ServiceEntry updates
if !servicesChanged && oldPushContext != nil {
@@ -551,14 +591,18 @@ func (ps *PushContext) updateContext(env *Environment,
oldPushContext *PushConte
}
if serviceRoutesChanged {
+ log.Infof("updateContext: ServiceRoutes changed,
re-initializing ServiceRoute index")
ps.initServiceRoutes(env)
} else {
+ log.Debugf("updateContext: ServiceRoutes unchanged, reusing old
ServiceRoute index")
ps.serviceRouteIndex = oldPushContext.serviceRouteIndex
}
if subsetRulesChanged {
+ log.Infof("updateContext: SubsetRules changed, re-initializing
SubsetRule index")
ps.initSubsetRules(env)
} else {
+ log.Debugf("updateContext: SubsetRules unchanged, reusing old
SubsetRule index")
ps.subsetRuleIndex = oldPushContext.subsetRuleIndex
}
@@ -612,15 +656,34 @@ func (ps *PushContext) GetAllServices() []*Service {
}
func (ps *PushContext) initServiceRoutes(env *Environment) {
+ log.Infof("initServiceRoutes: starting ServiceRoute initialization")
ps.serviceRouteIndex.referencedDestinations = map[string]sets.String{}
serviceroutes := env.List(gvk.ServiceRoute, NamespaceAll)
+ log.Infof("initServiceRoutes: found %d ServiceRoute configs",
len(serviceroutes))
sroutes := make([]config.Config, len(serviceroutes))
for i, r := range serviceroutes {
sroutes[i] = resolveServiceRouteShortnames(r)
+ if vs, ok := r.Spec.(*networking.VirtualService); ok {
+ log.Infof("initServiceRoutes: ServiceRoute %s/%s with
hosts %v and %d HTTP routes",
+ r.Namespace, r.Name, vs.Hosts, len(vs.Http))
+ }
}
sroutes, ps.serviceRouteIndex.delegates =
mergeServiceRoutesIfNeeded(sroutes, ps.exportToDefaults.serviceRoute)
+ hostToRoutes := make(map[host.Name][]config.Config)
+ for i := range sroutes {
+ vs := sroutes[i].Spec.(*networking.VirtualService)
+ for idx, h := range vs.Hosts {
+ resolvedHost := string(ResolveShortnameToFQDN(h,
sroutes[i].Meta))
+ vs.Hosts[idx] = resolvedHost
+ hostName := host.Name(resolvedHost)
+ hostToRoutes[hostName] = append(hostToRoutes[hostName],
sroutes[i])
+ log.Debugf("initServiceRoutes: indexed ServiceRoute
%s/%s for hostname %s", sroutes[i].Namespace, sroutes[i].Name, hostName)
+ }
+ }
+ ps.serviceRouteIndex.hostToRoutes = hostToRoutes
+ log.Infof("initServiceRoutes: indexed ServiceRoutes for %d hostnames",
len(hostToRoutes))
}
// sortConfigBySelectorAndCreationTime sorts the list of config objects based
on priority and creation time.
@@ -708,16 +771,42 @@ func (ps *PushContext) setSubsetRules(configs
[]config.Config) {
ps.subsetRuleIndex.namespaceLocal = namespaceLocalSubRules
ps.subsetRuleIndex.exportedByNamespace = exportedDestRulesByNamespace
ps.subsetRuleIndex.rootNamespaceLocal = rootNamespaceLocalDestRules
+
+ // Log indexing results
+ log.Infof("setSubsetRules: indexed %d namespaces with local rules",
len(namespaceLocalSubRules))
+ for ns, rules := range namespaceLocalSubRules {
+ totalRules := 0
+ for _, ruleList := range rules.specificSubRules {
+ totalRules += len(ruleList)
+ }
+ log.Infof("setSubsetRules: namespace %s has %d DestinationRules
with %d specific hostnames", ns, totalRules, len(rules.specificSubRules))
+ for hostname := range rules.specificSubRules {
+ log.Debugf("setSubsetRules: namespace %s has rules for
hostname %s", ns, hostname)
+ }
+ }
+ log.Infof("setSubsetRules: indexed %d namespaces with exported rules",
len(exportedDestRulesByNamespace))
+ if rootNamespaceLocalDestRules != nil {
+ totalRootRules := 0
+ for _, ruleList := range
rootNamespaceLocalDestRules.specificSubRules {
+ totalRootRules += len(ruleList)
+ }
+ log.Infof("setSubsetRules: root namespace has %d
DestinationRules with %d specific hostnames", totalRootRules,
len(rootNamespaceLocalDestRules.specificSubRules))
+ }
}
func (ps *PushContext) initSubsetRules(env *Environment) {
configs := env.List(gvk.SubsetRule, NamespaceAll)
+ log.Infof("initSubsetRules: found %d SubsetRule configs", len(configs))
// values returned from ConfigStore.List are immutable.
// Therefore, we make a copy
subRules := make([]config.Config, len(configs))
for i := range subRules {
subRules[i] = configs[i]
+ if dr, ok := configs[i].Spec.(*networking.DestinationRule); ok {
+ log.Infof("initSubsetRules: SubsetRule %s/%s for host
%s with %d subsets",
+ configs[i].Namespace, configs[i].Name, dr.Host,
len(dr.Subsets))
+ }
}
ps.setSubsetRules(subRules)
@@ -749,6 +838,103 @@ func (ps *PushContext) initServiceAccounts(env
*Environment, services []*Service
}
}
+// ServiceRouteForHost returns the first ServiceRoute (VirtualService) that
matches the given host.
+func (ps *PushContext) ServiceRouteForHost(hostname host.Name)
*networking.VirtualService {
+ routes := ps.serviceRouteIndex.hostToRoutes[hostname]
+ if len(routes) == 0 {
+ log.Debugf("ServiceRouteForHost: no ServiceRoute found for
hostname %s", hostname)
+ return nil
+ }
+ if vs, ok := routes[0].Spec.(*networking.VirtualService); ok {
+ log.Infof("ServiceRouteForHost: found ServiceRoute %s/%s for
hostname %s with %d HTTP routes",
+ routes[0].Namespace, routes[0].Name, hostname,
len(vs.Http))
+ return vs
+ }
+ log.Warnf("ServiceRouteForHost: ServiceRoute %s/%s for hostname %s is
not a VirtualService",
+ routes[0].Namespace, routes[0].Name, hostname)
+ return nil
+}
+
+// DestinationRuleForService returns the first DestinationRule (SubsetRule)
applicable to the service hostname/namespace.
+func (ps *PushContext) DestinationRuleForService(namespace string, hostname
host.Name) *networking.DestinationRule {
+ log.Debugf("DestinationRuleForService: looking for DestinationRule for
%s/%s", namespace, hostname)
+
+ // Check namespace-local rules first
+ if nsRules := ps.subsetRuleIndex.namespaceLocal[namespace]; nsRules !=
nil {
+ log.Debugf("DestinationRuleForService: checking namespace-local
rules for %s (found %d specific rules)", namespace,
len(nsRules.specificSubRules))
+ if dr := firstDestinationRule(nsRules, hostname); dr != nil {
+ log.Infof("DestinationRuleForService: found
DestinationRule in namespace-local index for %s/%s with %d subsets", namespace,
hostname, len(dr.Subsets))
+ return dr
+ }
+ } else {
+ log.Debugf("DestinationRuleForService: no namespace-local rules
for namespace %s", namespace)
+ }
+
+ // Check exported rules
+ log.Debugf("DestinationRuleForService: checking exported rules (found
%d exported namespaces)", len(ps.subsetRuleIndex.exportedByNamespace))
+ for ns, exported := range ps.subsetRuleIndex.exportedByNamespace {
+ if dr := firstDestinationRule(exported, hostname); dr != nil {
+ log.Infof("DestinationRuleForService: found
DestinationRule in exported rules from namespace %s for %s/%s with %d subsets",
ns, namespace, hostname, len(dr.Subsets))
+ return dr
+ }
+ }
+
+ // Finally, check root namespace scoped rules
+ if rootRules := ps.subsetRuleIndex.rootNamespaceLocal; rootRules != nil
{
+ log.Debugf("DestinationRuleForService: checking root namespace
rules (found %d specific rules)", len(rootRules.specificSubRules))
+ if dr := firstDestinationRule(rootRules, hostname); dr != nil {
+ log.Infof("DestinationRuleForService: found
DestinationRule in root namespace for %s/%s with %d subsets", namespace,
hostname, len(dr.Subsets))
+ return dr
+ }
+ }
+
+ log.Warnf("DestinationRuleForService: no DestinationRule found for
%s/%s", namespace, hostname)
+ return nil
+}
+
+// SubsetLabelsForHost returns the label selector for a subset defined in
DestinationRule.
+func (ps *PushContext) SubsetLabelsForHost(namespace string, hostname
host.Name, subset string) labels.Instance {
+ if subset == "" {
+ return nil
+ }
+ rule := ps.DestinationRuleForService(namespace, hostname)
+ if rule == nil {
+ return nil
+ }
+ for _, ss := range rule.Subsets {
+ if ss.Name == subset {
+ return labels.Instance(ss.Labels)
+ }
+ }
+ return nil
+}
+
+func firstDestinationRule(csr *consolidatedSubRules, hostname host.Name)
*networking.DestinationRule {
+ if csr == nil {
+ log.Debugf("firstDestinationRule: consolidatedSubRules is nil
for hostname %s", hostname)
+ return nil
+ }
+ if rules := csr.specificSubRules[hostname]; len(rules) > 0 {
+ log.Debugf("firstDestinationRule: found %d rules for hostname
%s", len(rules), hostname)
+ if dr, ok := rules[0].rule.Spec.(*networking.DestinationRule);
ok {
+ log.Debugf("firstDestinationRule: successfully cast to
DestinationRule for hostname %s", hostname)
+ return dr
+ } else {
+ log.Warnf("firstDestinationRule: failed to cast rule to
DestinationRule for hostname %s", hostname)
+ }
+ } else {
+ log.Debugf("firstDestinationRule: no specific rules found for
hostname %s (available hostnames: %v)", hostname, func() []string {
+ hosts := make([]string, 0, len(csr.specificSubRules))
+ for h := range csr.specificSubRules {
+ hosts = append(hosts, string(h))
+ }
+ return hosts
+ }())
+ }
+ // TODO: support wildcard hosts
+ return nil
+}
+
func (ps *PushContext) DelegateServiceRoutes(vses []config.Config)
[]ConfigHash {
var out []ConfigHash
for _, vs := range vses {
diff --git a/dubbod/planet/pkg/model/serviceroute.go
b/dubbod/planet/pkg/model/serviceroute.go
index 46a2819f..e455b7e3 100644
--- a/dubbod/planet/pkg/model/serviceroute.go
+++ b/dubbod/planet/pkg/model/serviceroute.go
@@ -1,13 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package model
import (
+ "strings"
+
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "google.golang.org/protobuf/proto"
networking "istio.io/api/networking/v1alpha3"
"k8s.io/apimachinery/pkg/types"
- "strings"
)
func resolveServiceRouteShortnames(config config.Config) config.Config {
@@ -209,14 +228,21 @@ func mergeHTTPMatchRequests(root, delegate
[]*networking.HTTPMatchRequest) (out
}
func mergeHTTPMatchRequest(root, delegate *networking.HTTPMatchRequest)
*networking.HTTPMatchRequest {
- // nolint: govet
- out := *delegate
+ cloned := proto.Clone(delegate)
+ out, ok := cloned.(*networking.HTTPMatchRequest)
+ if !ok {
+ log.Warnf("mergeHTTPMatchRequest: failed to clone
HTTPMatchRequest for delegate %s", delegate.GetName())
+ return nil
+ }
+ if out == nil {
+ return nil
+ }
if out.Name == "" {
out.Name = root.Name
} else if root.Name != "" {
out.Name = root.Name + "-" + out.Name
}
- return &out
+ return out
}
func hasConflict(root, leaf *networking.HTTPMatchRequest) bool {
diff --git a/dubbod/planet/pkg/model/subsetrule.go
b/dubbod/planet/pkg/model/subsetrule.go
index f6dab28a..954b8cde 100644
--- a/dubbod/planet/pkg/model/subsetrule.go
+++ b/dubbod/planet/pkg/model/subsetrule.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package model
import (
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go
b/dubbod/planet/pkg/networking/grpcgen/cds.go
index 4a3676b8..3efff2bb 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -113,7 +114,8 @@ func newClusterBuilder(node *model.Proxy, push
*model.PushContext, defaultCluste
func (b *clusterBuilder) build() []*cluster.Cluster {
var defaultCluster *cluster.Cluster
- if b.filter.Contains(b.defaultClusterName) {
+ defaultRequested := b.filter == nil ||
b.filter.Contains(b.defaultClusterName)
+ if defaultRequested {
defaultCluster = b.edsCluster(b.defaultClusterName)
// CRITICAL: For gRPC proxyless, we need to set CommonLbConfig
to handle endpoint health status
// Following Istio's implementation, we should include
UNHEALTHY and DRAINING endpoints
@@ -136,6 +138,7 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
core.HealthStatus_DEGRADED,
},
}
+ log.Infof("clusterBuilder.build: generated default cluster %s",
b.defaultClusterName)
}
subsetClusters := b.applyDestinationRule(defaultCluster)
@@ -143,7 +146,10 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
if defaultCluster != nil {
out = append(out, defaultCluster)
}
- return append(out, subsetClusters...)
+ result := append(out, subsetClusters...)
+ log.Infof("clusterBuilder.build: generated %d clusters total (1 default
+ %d subsets) for %s",
+ len(result), len(subsetClusters), b.defaultClusterName)
+ return result
}
func (b *clusterBuilder) edsCluster(name string) *cluster.Cluster {
@@ -167,8 +173,71 @@ func (b *clusterBuilder) edsCluster(name string)
*cluster.Cluster {
func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster)
(subsetClusters []*cluster.Cluster) {
if b.svc == nil || b.port == nil {
+ log.Warnf("applyDestinationRule: service or port is nil for
%s", b.defaultClusterName)
+ return nil
+ }
+ log.Infof("applyDestinationRule: looking for DestinationRule for
service %s/%s (hostname=%s, port=%d)",
+ b.svc.Attributes.Namespace, b.svc.Attributes.Name, b.hostname,
b.portNum)
+ dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace,
b.hostname)
+ if dr == nil {
+ log.Warnf("applyDestinationRule: no DestinationRule found for
%s/%s", b.svc.Attributes.Namespace, b.hostname)
return nil
}
- // TODO
- return
+ if len(dr.Subsets) == 0 {
+ log.Warnf("applyDestinationRule: DestinationRule found for
%s/%s but has no subsets", b.svc.Attributes.Namespace, b.hostname)
+ return nil
+ }
+
+ log.Infof("applyDestinationRule: found DestinationRule for %s/%s with
%d subsets, defaultCluster requested=%v",
+ b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets),
defaultCluster != nil)
+
+ var commonLbConfig *cluster.Cluster_CommonLbConfig
+ if defaultCluster != nil {
+ commonLbConfig = defaultCluster.CommonLbConfig
+ } else {
+ commonLbConfig = &cluster.Cluster_CommonLbConfig{
+ OverrideHostStatus: &core.HealthStatusSet{
+ Statuses: []core.HealthStatus{
+ core.HealthStatus_HEALTHY,
+ core.HealthStatus_UNHEALTHY,
+ core.HealthStatus_DRAINING,
+ core.HealthStatus_UNKNOWN,
+ core.HealthStatus_DEGRADED,
+ },
+ },
+ }
+ }
+
+ defaultClusterRequested := defaultCluster != nil
+ if b.filter != nil {
+ defaultClusterRequested =
b.filter.Contains(b.defaultClusterName)
+ }
+
+ for _, subset := range dr.Subsets {
+ if subset == nil || subset.Name == "" {
+ continue
+ }
+ clusterName :=
model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, b.hostname,
b.portNum)
+
+ // CRITICAL: Always generate subset clusters if default cluster
is requested
+ // This is essential for RDS WeightedCluster to work correctly
+ shouldGenerate := true
+ if b.filter != nil && !b.filter.Contains(clusterName) {
+ // Subset cluster not explicitly requested, but
generate it if default cluster was requested
+ shouldGenerate = defaultClusterRequested
+ }
+
+ if !shouldGenerate {
+ log.Debugf("applyDestinationRule: skipping subset
cluster %s (not requested and default not requested)", clusterName)
+ continue
+ }
+
+ log.Infof("applyDestinationRule: generating subset cluster %s
for subset %s", clusterName, subset.Name)
+ subsetCluster := b.edsCluster(clusterName)
+ subsetCluster.CommonLbConfig = commonLbConfig
+ subsetClusters = append(subsetClusters, subsetCluster)
+ }
+
+ log.Infof("applyDestinationRule: generated %d subset clusters for
%s/%s", len(subsetClusters), b.svc.Attributes.Namespace, b.hostname)
+ return subsetClusters
}
diff --git a/dubbod/planet/pkg/networking/grpcgen/rds.go
b/dubbod/planet/pkg/networking/grpcgen/rds.go
index 37c74b57..11dcf120 100644
--- a/dubbod/planet/pkg/networking/grpcgen/rds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/rds.go
@@ -19,13 +19,17 @@ package grpcgen
import (
"fmt"
- "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
- discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"strconv"
"strings"
+ "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
+ discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ "google.golang.org/protobuf/types/known/wrapperspb"
+ networking "istio.io/api/networking/v1alpha3"
)
func (g *GrpcConfigGenerator) BuildHTTPRoutes(node *model.Proxy, push
*model.PushContext, routeNames []string) model.Resources {
@@ -77,28 +81,28 @@ func buildHTTPRoute(node *model.Proxy, push
*model.PushContext, routeName string
}
domains = append(domains, "*") // Wildcard for any domain -
LEAST SPECIFIC
+ outboundRoutes := []*route.Route{
+ defaultSingleClusterRoute(routeName),
+ }
+ if vs := push.ServiceRouteForHost(host.Name(hostStr)); vs !=
nil {
+ log.Infof("buildHTTPRoute: found ServiceRoute for host
%s with %d HTTP routes", hostStr, len(vs.Http))
+ if routes := buildRoutesFromServiceRoute(vs,
host.Name(hostStr), parsedPort); len(routes) > 0 {
+ log.Infof("buildHTTPRoute: built %d weighted
routes from ServiceRoute for host %s", len(routes), hostStr)
+ outboundRoutes = routes
+ } else {
+ log.Warnf("buildHTTPRoute: ServiceRoute found
but no routes built for host %s", hostStr)
+ }
+ } else {
+ log.Debugf("buildHTTPRoute: no ServiceRoute found for
host %s, using default route", hostStr)
+ }
+
return &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
{
Name: fmt.Sprintf("%s|http|%d",
hostStr, parsedPort),
Domains: domains,
- Routes: []*route.Route{
- {
- Match:
&route.RouteMatch{
- PathSpecifier:
&route.RouteMatch_Prefix{
- Prefix:
"/",
- },
- },
- Action:
&route.Route_Route{
- Route:
&route.RouteAction{
-
ClusterSpecifier: &route.RouteAction_Cluster{
-
Cluster: routeName, // Use routeName (cluster name)
- },
- },
- },
- },
- },
+ Routes: outboundRoutes,
},
},
}
@@ -126,3 +130,110 @@ func buildHTTPRoute(node *model.Proxy, push
*model.PushContext, routeName string
},
}
}
+
+func defaultSingleClusterRoute(clusterName string) *route.Route {
+ return &route.Route{
+ Match: &route.RouteMatch{
+ PathSpecifier: &route.RouteMatch_Prefix{
+ Prefix: "/",
+ },
+ },
+ Action: &route.Route_Route{
+ Route: &route.RouteAction{
+ ClusterSpecifier: &route.RouteAction_Cluster{
+ Cluster: clusterName,
+ },
+ },
+ },
+ }
+}
+
+func buildRoutesFromServiceRoute(vs *networking.VirtualService, hostName
host.Name, defaultPort int) []*route.Route {
+ if vs == nil || len(vs.Http) == 0 {
+ return nil
+ }
+ var routes []*route.Route
+ for _, httpRoute := range vs.Http {
+ if httpRoute == nil {
+ continue
+ }
+ if built := buildRouteFromHTTPRoute(httpRoute, hostName,
defaultPort); built != nil {
+ routes = append(routes, built)
+ }
+ }
+ return routes
+}
+
+func buildRouteFromHTTPRoute(httpRoute *networking.HTTPRoute, hostName
host.Name, defaultPort int) *route.Route {
+ if httpRoute == nil || len(httpRoute.Route) == 0 {
+ log.Warnf("buildRouteFromHTTPRoute: httpRoute is nil or has no
routes")
+ return nil
+ }
+ log.Infof("buildRouteFromHTTPRoute: processing HTTPRoute with %d route
destinations", len(httpRoute.Route))
+ weights := make([]*route.WeightedCluster_ClusterWeight, 0,
len(httpRoute.Route))
+ var totalWeight uint32
+ for i, dest := range httpRoute.Route {
+ if dest == nil {
+ log.Warnf("buildRouteFromHTTPRoute: route[%d] is nil",
i)
+ continue
+ }
+ destination := dest.Destination
+ if destination == nil {
+ log.Warnf("buildRouteFromHTTPRoute: route[%d] has nil
Destination (weight=%d), creating default destination with host=%s",
+ i, dest.Weight, hostName)
+ destination = &networking.Destination{
+ Host: string(hostName),
+ }
+ } else {
+ log.Debugf("buildRouteFromHTTPRoute: route[%d]
Destination: host=%s, subset=%s, port=%v, weight=%d",
+ i, destination.Host, destination.Subset,
destination.Port, dest.Weight)
+ }
+ targetHost := destination.Host
+ if targetHost == "" {
+ targetHost = string(hostName)
+ }
+ targetPort := defaultPort
+ if destination.Port != nil && destination.Port.Number != 0 {
+ targetPort = int(destination.Port.Number)
+ }
+ subsetName := destination.Subset
+ clusterName :=
model.BuildSubsetKey(model.TrafficDirectionOutbound, subsetName,
host.Name(targetHost), targetPort)
+ weight := dest.Weight
+ if weight <= 0 {
+ weight = 1
+ }
+ totalWeight += uint32(weight)
+ log.Infof("buildRouteFromHTTPRoute: route[%d] -> cluster=%s,
subset=%s, weight=%d, host=%s, port=%d",
+ i, clusterName, subsetName, weight, targetHost,
targetPort)
+ weights = append(weights, &route.WeightedCluster_ClusterWeight{
+ Name: clusterName,
+ Weight: wrapperspb.UInt32(uint32(weight)),
+ })
+ }
+ if len(weights) == 0 {
+ log.Warnf("buildRouteFromHTTPRoute: no valid weights generated")
+ return nil
+ }
+ weightedClusters := &route.WeightedCluster{
+ Clusters: weights,
+ }
+ if totalWeight > 0 {
+ weightedClusters.TotalWeight = wrapperspb.UInt32(totalWeight)
+ }
+ log.Infof("buildRouteFromHTTPRoute: built WeightedCluster with %d
clusters, totalWeight=%d", len(weights), totalWeight)
+
+ return &route.Route{
+ Match: &route.RouteMatch{
+ PathSpecifier: &route.RouteMatch_Prefix{
+ Prefix: "/",
+ },
+ },
+ Action: &route.Route_Route{
+ Route: &route.RouteAction{
+ ClusterSpecifier:
&route.RouteAction_WeightedClusters{
+ WeightedClusters: weightedClusters,
+ },
+ },
+ },
+ }
+}
diff --git a/dubbod/planet/pkg/xds/ads.go b/dubbod/planet/pkg/xds/ads.go
index 3ea03210..8a598f72 100644
--- a/dubbod/planet/pkg/xds/ads.go
+++ b/dubbod/planet/pkg/xds/ads.go
@@ -19,12 +19,13 @@ package xds
import (
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/maps"
"strconv"
"strings"
"sync/atomic"
"time"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
@@ -332,15 +333,12 @@ func (s *DiscoveryServer) processRequest(req
*discovery.DiscoveryRequest, con *C
resourceNamesStr = " [wildcard]"
}
+ // Always log at INFO so手工调用 grpcurl 也能看到完整请求轨迹
if shouldRespond {
- // Log NEW requests at INFO level - these are triggered by
grpcurl requests
- // This makes it easy to see when a grpcurl request triggers
xDS configuration
log.Infof("%s: REQ %s resources:%d nonce:%s%s (will respond)",
stype,
con.ID(), len(req.ResourceNames), req.ResponseNonce,
resourceNamesStr)
} else {
- // Log ACK/ignored requests at DEBUG level to reduce noise
- // These are normal XDS protocol ACKs, not new requests from
grpcurl
- log.Debugf("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)",
stype,
+ log.Infof("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)",
stype,
con.ID(), len(req.ResourceNames), req.ResponseNonce,
resourceNamesStr)
}
diff --git a/dubbod/planet/pkg/xds/delta.go b/dubbod/planet/pkg/xds/delta.go
index c75a7b2f..18d0260f 100644
--- a/dubbod/planet/pkg/xds/delta.go
+++ b/dubbod/planet/pkg/xds/delta.go
@@ -19,10 +19,11 @@ package xds
import (
"errors"
- dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
"strings"
"time"
+ dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
+
dubbogrpc "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/grpc"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
@@ -67,12 +68,16 @@ func (s *DiscoveryServer) StreamDeltas(stream
DeltaDiscoveryStream) error {
return status.Errorf(codes.ResourceExhausted, "request rate
limit exceeded: %v", err)
}
- // TODO authenticate
+ ids, err := s.authenticate(ctx)
+ if err != nil {
+ return status.Error(codes.Unauthenticated, err.Error())
+ }
s.globalPushContext().InitContext(s.Env, nil, nil)
con := newDeltaConnection(peerAddr, stream)
+ con.s = s
- go s.receiveDelta(con, nil)
+ go s.receiveDelta(con, ids)
<-con.InitializedCh()
@@ -154,6 +159,18 @@ func (s *DiscoveryServer) receiveDelta(con *Connection,
identities []string) {
deltaLog.Infof("new delta connection for node:%s",
con.ID())
}
+ subscribeStr := " [wildcard]"
+ if len(req.ResourceNamesSubscribe) > 0 {
+ subscribeStr = " [" +
strings.Join(req.ResourceNamesSubscribe, ", ") + "]"
+ }
+ unsubscribeStr := ""
+ if len(req.ResourceNamesUnsubscribe) > 0 {
+ unsubscribeStr = " unsubscribe:[" +
strings.Join(req.ResourceNamesUnsubscribe, ", ") + "]"
+ }
+ deltaLog.Infof("%s: RAW DELTA REQ %s sub:%d%s nonce:%s%s",
+ v3.GetShortType(req.TypeUrl), con.ID(),
len(req.ResourceNamesSubscribe), subscribeStr,
+ req.ResponseNonce, unsubscribeStr)
+
select {
case con.deltaReqChan <- req:
case <-con.deltaStream.Context().Done():
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index 92218554..e18b9190 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -273,14 +273,18 @@ func (b *EndpointBuilder) servicePort(port int)
*model.Port {
}
func (b *EndpointBuilder) matchesSubset(epLabels labels.Instance) bool {
- // TODO: implement subset matching logic based on SubsetRule
- // For now, return true if no subset is specified or if subset is empty
if b.subsetName == "" {
return true
}
- // Simplified subset matching - in real implementation, this should
match
- // against SubsetRule subset labels
- return true
+ if b.service == nil || b.push == nil {
+ return true
+ }
+ selector := b.push.SubsetLabelsForHost(b.service.Attributes.Namespace,
b.hostname, b.subsetName)
+ if len(selector) == 0 {
+ // No subset labels defined, treat as match-all
+ return true
+ }
+ return selector.SubsetOf(epLabels)
}
func (b *EndpointBuilder) buildLbEndpoint(ep *model.DubboEndpoint)
*endpoint.LbEndpoint {
diff --git a/dubbod/planet/pkg/xds/xdsgen.go b/dubbod/planet/pkg/xds/xdsgen.go
index 318ba13a..54e7f1aa 100644
--- a/dubbod/planet/pkg/xds/xdsgen.go
+++ b/dubbod/planet/pkg/xds/xdsgen.go
@@ -580,6 +580,26 @@ func (s *DiscoveryServer) pushDeltaXds(con *Connection, w
*model.WatchedResource
resp.RemovedResources = sets.SortedList(removed)
}
var newResourceNames sets.String
+ if shouldSetWatchedResources(w) {
+ if usedDelta {
+ if w.ResourceNames != nil {
+ newResourceNames = w.ResourceNames.Copy()
+ } else {
+ newResourceNames = sets.New[string]()
+ }
+ for _, removed := range resp.RemovedResources {
+ newResourceNames.Delete(removed)
+ }
+ for _, r := range res {
+ newResourceNames.Insert(r.Name)
+ }
+ } else {
+ newResourceNames = resourceNamesSet(res)
+ }
+ }
+ if neverRemoveDelta(w.TypeUrl) {
+ resp.RemovedResources = nil
+ }
if len(resp.RemovedResources) > 0 {
deltaLog.Infof("%v REMOVE for node:%s %v",
v3.GetShortType(w.TypeUrl), con.ID(), resp.RemovedResources)
}
@@ -632,6 +652,36 @@ func (s *DiscoveryServer) findGenerator(typeURL string,
con *Connection) model.X
return g
}
+func resourceNamesSet(res model.Resources) sets.String {
+ names := sets.New[string]()
+ for _, r := range res {
+ if r != nil {
+ names.Insert(r.Name)
+ }
+ }
+ return names
+}
+
+func shouldSetWatchedResources(w *model.WatchedResource) bool {
+ if w == nil {
+ return false
+ }
+ if requiresResourceNamesModification(w.TypeUrl) {
+ return false
+ }
+ return xds.IsWildcardTypeURL(w.TypeUrl)
+}
+
+func requiresResourceNamesModification(typeURL string) bool {
+ return typeURL == v3.AddressType
+}
+
+func neverRemoveDelta(typeURL string) bool {
+ // Align with Envoy bug https://github.com/envoyproxy/envoy/issues/32823
+ // Skip removals for ExtensionConfiguration to avoid flapping.
+ return typeURL == v3.ExtensionConfigurationType
+}
+
// extractRouteNamesFromLDS extracts route names referenced in LDS listener
resources
// For outbound listeners with ApiListener, the route name is the
RouteConfigName from Rds config
// Route name format is the same as cluster name: "outbound|port||hostname"
diff --git a/manifests/charts/base/files/crd-all.yaml
b/manifests/charts/base/files/crd-all.yaml
index 6e55e0cc..9f82a97e 100644
--- a/manifests/charts/base/files/crd-all.yaml
+++ b/manifests/charts/base/files/crd-all.yaml
@@ -197,7 +197,7 @@ spec:
redirect or forward (default) traffic.
items:
properties:
- subset:
+ destination:
description: Subset uniquely identifies the
instances
of a service to which the request/connection
should
be forwarded to.
@@ -225,7 +225,7 @@ spec:
format: int32
type: integer
required:
- - subset
+ - destination
type: object
type: array
type: object
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
index 3528c947..2efb6f52 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
@@ -7,6 +7,9 @@ rules:
- apiGroups: [ "security.istio.io", "networking.istio.io" ]
verbs: [ "get", "watch", "list" ]
resources: [ "*" ]
+ - apiGroups: [ "security.dubbo.apache.org", "networking.dubbo.apache.org" ]
+ verbs: [ "get", "watch", "list" ]
+ resources: [ "*" ]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations"]
verbs: ["get", "list", "watch", "update", "patch"]
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
index 18c3d772..34c303bd 100644
---
a/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
+++
b/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
@@ -4,24 +4,37 @@ kind: ValidatingWebhookConfiguration
metadata:
name: dubbo-validator-dubbo-system
namespace: default
+ labels:
+ app: dubbod
+ dubbo.apache.org/rev: {{ .Values.revision | default "default" }}
webhooks:
- - name: validation.dubbo.apache.org
+ - name: rev.validation.dubbo.apache.org
+ admissionReviewVersions: ["v1"]
clientConfig:
service:
name: dubbod
namespace: dubbo-system
path: "/validate"
+ caBundle: ""
+ failurePolicy: Ignore
+ objectSelector:
+ matchExpressions:
+ - key: dubbo.apache.org/rev
+ operator: In
+ values:
+ - default
rules:
- - operations:
- - CREATE
- - UPDATE
- apiGroups:
- - security.istio.io
- - networking.istio.io
- apiVersions:
- - "*"
- resources:
- - "*"
+ - operations:
+ - CREATE
+ - UPDATE
+ apiGroups:
+ - security.istio.io
+ - networking.istio.io
+ - security.dubbo.apache.org
+ - networking.dubbo.apache.org
+ apiVersions:
+ - "*"
+ resources:
+ - "*"
sideEffects: None
- admissionReviewVersions: ["v1"]
diff --git a/pkg/config/schema/collections/collections.agent.go
b/pkg/config/schema/collections/collections.agent.go
index b7a6a894..60bb62de 100644
--- a/pkg/config/schema/collections/collections.agent.go
+++ b/pkg/config/schema/collections/collections.agent.go
@@ -49,7 +49,7 @@ var (
Identifier: "SubsetRule",
Group: "networking.dubbo.apache.org",
Kind: "SubsetRule",
- Plural: "destinationrules",
+ Plural: "subsetrules",
Version: "v1",
VersionAliases: []string{},
Proto: "istio.networking.v1alpha3.DestinationRule",
StatusProto: "istio.meta.v1alpha1.IstioStatus",
@@ -60,9 +60,9 @@ var (
Builtin: false,
}.MustBuild()
ServiceRoute = collection.Builder{
- Identifier: "serviceRoute",
+ Identifier: "ServiceRoute",
Group: "networking.dubbo.apache.org",
- Kind: "serviceRoute",
+ Kind: "ServiceRoute",
Plural: "serviceroutes",
Version: "v1",
VersionAliases: []string{},
diff --git a/pkg/config/schema/collections/collections.go
b/pkg/config/schema/collections/collections.go
index a144c236..49a15f41 100644
--- a/pkg/config/schema/collections/collections.go
+++ b/pkg/config/schema/collections/collections.go
@@ -49,7 +49,7 @@ var (
Identifier: "SubsetRule",
Group: "networking.dubbo.apache.org",
Kind: "SubsetRule",
- Plural: "destinationrules",
+ Plural: "subsetrules",
Version: "v1",
VersionAliases: []string{},
Proto: "istio.networking.v1alpha3.DestinationRule",
StatusProto: "istio.meta.v1alpha1.IstioStatus",
@@ -60,9 +60,9 @@ var (
Builtin: false,
}.MustBuild()
ServiceRoute = collection.Builder{
- Identifier: "serviceRoute",
+ Identifier: "ServiceRoute",
Group: "networking.dubbo.apache.org",
- Kind: "serviceRoute",
+ Kind: "ServiceRoute",
Plural: "serviceroutes",
Version: "v1",
VersionAliases: []string{},
diff --git a/pkg/config/schema/gvk/resources.go
b/pkg/config/schema/gvk/resources.go
index cf3828cd..a55a3f4e 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -40,7 +40,7 @@ var (
MeshConfig = config.GroupVersionKind{Group: "",
Version: "v1alpha1", Kind: "MeshConfig"}
PeerAuthentication = config.GroupVersionKind{Group:
"security.dubbo.apache.org", Version: "v1", Kind: "PeerAuthentication"}
SubsetRule = config.GroupVersionKind{Group:
"networking.dubbo.apache.org", Version: "v1", Kind: "SubsetRule"}
- ServiceRoute = config.GroupVersionKind{Group:
"networking.dubbo.apache.org", Version: "v1", Kind: "serviceRoute"}
+ ServiceRoute = config.GroupVersionKind{Group:
"networking.dubbo.apache.org", Version: "v1", Kind: "ServiceRoute"}
EndpointSlice = config.GroupVersionKind{Group:
"discovery.k8s.io", Version: "v1", Kind: "EndpointSlice"}
Endpoints = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "Endpoints"}
)
diff --git a/pkg/config/schema/kind/resources.go
b/pkg/config/schema/kind/resources.go
index 2fdccc3e..602ca515 100644
--- a/pkg/config/schema/kind/resources.go
+++ b/pkg/config/schema/kind/resources.go
@@ -68,7 +68,7 @@ func (k Kind) String() string {
case PeerAuthentication:
return "PeerAuthentication"
case ServiceRoute:
- return "serviceRoute"
+ return "ServiceRoute"
case SubsetRule:
return "SubsetRule"
default:
diff --git a/pkg/config/schema/kubeclient/resources.go
b/pkg/config/schema/kubeclient/resources.go
index 6cf97081..8e0ca75d 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -20,9 +20,11 @@ package kubeclient
import (
"context"
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
ktypes "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
apiistioioapinetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
@@ -216,25 +218,78 @@ func getInformerFiltered(c ClientGetter, opts
ktypes.InformerOptions, g schema.G
return
c.Kube().AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.Background(),
options)
}
case gvr.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, not
networking.istio.io
+ // Use Dynamic client to access it
+ gvr := schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }
l = func(options metav1.ListOptions) (runtime.Object, error) {
- return
c.Dubbo().NetworkingV1().VirtualServices(opts.Namespace).List(context.Background(),
options)
+ return
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).List(context.Background(),
options)
}
w = func(options metav1.ListOptions) (watch.Interface, error) {
- return
c.Dubbo().NetworkingV1().VirtualServices(opts.Namespace).Watch(context.Background(),
options)
+ return
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).Watch(context.Background(),
options)
}
case gvr.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, not
networking.istio.io
+ // Use Dynamic client to access it
+ gvr := schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }
l = func(options metav1.ListOptions) (runtime.Object, error) {
- return
c.Dubbo().NetworkingV1().DestinationRules(opts.Namespace).List(context.Background(),
options)
+ // Log the namespace being watched for diagnosis
+ if opts.Namespace == "" {
+ log.Infof("SubsetRule informer: List called for
all namespaces")
+ } else {
+ log.Infof("SubsetRule informer: List called for
namespace %s", opts.Namespace)
+ }
+ return
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).List(context.Background(),
options)
}
w = func(options metav1.ListOptions) (watch.Interface, error) {
- return
c.Dubbo().NetworkingV1().DestinationRules(opts.Namespace).Watch(context.Background(),
options)
+ // Log the namespace being watched for diagnosis
+ if opts.Namespace == "" {
+ log.Infof("SubsetRule informer: Watch called
for all namespaces")
+ } else {
+ log.Infof("SubsetRule informer: Watch called
for namespace %s", opts.Namespace)
+ }
+ watchInterface, err :=
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).Watch(context.Background(),
options)
+ if err != nil {
+ log.Errorf("SubsetRule informer: Watch failed:
%v", err)
+ } else {
+ log.Infof("SubsetRule informer: Watch
connection established successfully")
+ }
+ return watchInterface, err
}
case gvr.PeerAuthentication:
+ peerAuthGVR := schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }
l = func(options metav1.ListOptions) (runtime.Object, error) {
- return
c.Dubbo().SecurityV1().PeerAuthentications(opts.Namespace).List(context.Background(),
options)
+ if opts.Namespace == "" {
+ log.Infof("PeerAuthentication informer: List
called for all namespaces")
+ } else {
+ log.Infof("PeerAuthentication informer: List
called for namespace %s", opts.Namespace)
+ }
+ return
c.Dynamic().Resource(peerAuthGVR).Namespace(opts.Namespace).List(context.Background(),
options)
}
w = func(options metav1.ListOptions) (watch.Interface, error) {
- return
c.Dubbo().SecurityV1().PeerAuthentications(opts.Namespace).Watch(context.Background(),
options)
+ if opts.Namespace == "" {
+ log.Infof("PeerAuthentication informer: Watch
called for all namespaces")
+ } else {
+ log.Infof("PeerAuthentication informer: Watch
called for namespace %s", opts.Namespace)
+ }
+ watchInterface, err :=
c.Dynamic().Resource(peerAuthGVR).Namespace(opts.Namespace).Watch(context.Background(),
options)
+ if err != nil {
+ log.Errorf("PeerAuthentication informer: Watch
failed: %v", err)
+ } else {
+ log.Infof("PeerAuthentication informer: Watch
connection established successfully")
+ }
+ return watchInterface, err
}
case gvr.Pod:
l = func(options metav1.ListOptions) (runtime.Object, error) {
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index eb65ff28..50363027 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -20,6 +20,7 @@ package kclient
import (
"context"
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
@@ -39,7 +40,6 @@ import (
"k8s.io/client-go/tools/cache"
"sync"
- "sync/atomic"
dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
)
@@ -123,36 +123,6 @@ func NewDelayedInformer[T controllers.ComparableObject](c
kube.Client, gvr schem
return newDelayedInformer[T](gvr, inf, delay, filter)
}
-func newDelayedInformer[T controllers.ComparableObject](gvr
schema.GroupVersionResource, getInf func() informerfactory.StartableInformer,
delay kubetypes.DelayedFilter, filter Filter) Informer[T] {
- delayedClient := &delayedClient[T]{
- inf: new(atomic.Pointer[Informer[T]]),
- delayed: delay,
- }
-
- // If resource is not yet known, we will use the delayedClient.
- // When the resource is later loaded, the callback will trigger and
swap our dummy delayedClient
- // with a full client
- readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
- // The inf() call is responsible for starting the informer
- inf := getInf()
- fc := &informerClient[T]{
- informer: inf.Informer,
- startInformer: inf.Start,
- }
- applyDynamicFilter(filter, gvr, fc)
- inf.Start(stop)
- log.Infof("%v is now ready, building client",
gvr.GroupResource())
- // Swap out the dummy client with the full one
- delayedClient.set(fc)
- })
- if !readyNow {
- log.Debugf("%v is not ready now, building delayed client",
gvr.GroupResource())
- return delayedClient
- }
- log.Debugf("%v ready now, building client", gvr.GroupResource())
- return newInformerClient[T](gvr, getInf(), filter)
-}
-
func NewFiltered[T controllers.ComparableObject](c kube.Client, filter Filter)
Client[T] {
gvr := types.MustToGVR[T](types.MustGVKFromType[T]())
inf := kubeclient.GetInformerFiltered[T](c, ToOpts(c, gvr, filter), gvr)
@@ -179,25 +149,35 @@ func applyDynamicFilter[T
controllers.ComparableObject](filter Filter, gvr schem
filter.ObjectFilter.AddHandler(func(added, removed sets.String)
{
ic.handlerMu.RLock()
defer ic.handlerMu.RUnlock()
+ log.Infof("applyDynamicFilter: namespace filter handler
triggered for %v: added=%v, removed=%v", gvr, added, removed)
if gvr == dubbogvr.Namespace {
for _, item := range
ic.ListUnfiltered(metav1.NamespaceAll, klabels.Everything()) {
if !added.Contains(item.GetName()) {
continue
}
+ log.Infof("applyDynamicFilter:
triggering OnAdd for namespace %s", item.GetName())
for _, c := range ic.registeredHandlers
{
c.handler.OnAdd(item, false)
}
}
} else {
for ns := range added {
- for _, item := range
ic.ListUnfiltered(ns, klabels.Everything()) {
+ log.Infof("applyDynamicFilter:
namespace %s added, listing unfiltered objects for %v", ns, gvr)
+ items := ic.ListUnfiltered(ns,
klabels.Everything())
+ log.Infof("applyDynamicFilter: found %d
unfiltered objects in namespace %s for %v", len(items), ns, gvr)
+ for _, item := range items {
+ log.Infof("applyDynamicFilter:
triggering OnAdd for %s/%s in namespace %s", item.GetNamespace(),
item.GetName(), ns)
for _, c := range
ic.registeredHandlers {
c.handler.OnAdd(item,
false)
}
}
}
for ns := range removed {
- for _, item := range
ic.ListUnfiltered(ns, klabels.Everything()) {
+ log.Infof("applyDynamicFilter:
namespace %s removed, listing unfiltered objects for %v", ns, gvr)
+ items := ic.ListUnfiltered(ns,
klabels.Everything())
+ log.Infof("applyDynamicFilter: found %d
unfiltered objects in namespace %s for %v", len(items), ns, gvr)
+ for _, item := range items {
+ log.Infof("applyDynamicFilter:
triggering OnDelete for %s/%s in namespace %s", item.GetNamespace(),
item.GetName(), ns)
for _, c := range
ic.registeredHandlers {
c.handler.OnDelete(item)
}
@@ -210,15 +190,32 @@ func applyDynamicFilter[T
controllers.ComparableObject](filter Filter, gvr schem
func (n *informerClient[T]) List(namespace string, selector klabels.Selector)
[]T {
var res []T
+ var filteredCount int
+ var totalCount int
err := cache.ListAllByNamespace(n.informer.GetIndexer(), namespace,
selector, func(i any) {
+ totalCount++
cast := i.(T)
if n.applyFilter(cast) {
res = append(res, cast)
+ } else {
+ filteredCount++
+ // Log filtered objects to help diagnose
+ if objWithNs, ok := any(cast).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ log.Debugf("informerClient.List: filtered out
object %s/%s for namespace=%s", objWithNs.GetNamespace(), objWithNs.GetName(),
namespace)
+ }
}
})
if err != nil {
- fmt.Printf("lister returned err for %v: %v", namespace, err)
+ log.Warnf("informerClient.List: lister returned err for
namespace=%s: %v", namespace, err)
+ }
+ if namespace == metav1.NamespaceAll {
+ log.Infof("informerClient.List: namespace=%s, total=%d,
filtered=%d, result=%d", namespace, totalCount, filteredCount, len(res))
+ } else if filteredCount > 0 {
+ log.Debugf("informerClient.List: filtered out %d items for
namespace=%s (total=%d, result=%d)", filteredCount, namespace, totalCount,
len(res))
}
return res
}
@@ -231,7 +228,20 @@ func (n *informerClient[T]) ListUnfiltered(namespace
string, selector klabels.Se
})
if err != nil {
- fmt.Printf("lister returned err for %v: %v", namespace, err)
+ log.Warnf("informerClient.ListUnfiltered: lister returned err
for namespace=%s: %v", namespace, err)
+ }
+ if namespace == metav1.NamespaceAll {
+ log.Infof("informerClient.ListUnfiltered: found %d unfiltered
objects for namespace=%s (synced=%v)", len(res), namespace,
n.informer.HasSynced())
+ if len(res) > 0 {
+ for i, obj := range res {
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+
log.Infof("informerClient.ListUnfiltered: object[%d] %s/%s", i,
objWithNs.GetNamespace(), objWithNs.GetName())
+ }
+ }
+ }
}
return res
}
@@ -285,12 +295,66 @@ func (n *informerClient[T]) ShutdownHandlers() {
func (n *informerClient[T]) AddEventHandler(h cache.ResourceEventHandler)
cache.ResourceEventHandlerRegistration {
fh := cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
if n.filter == nil {
+ log.Debugf("informerClient.AddEventHandler:
FilterFunc allowing object %s/%s (no filter)", nsStr, nameStr)
return true
}
- return n.filter(obj)
+ cast := obj.(T)
+ allowed := n.filter(cast)
+ if !allowed {
+ // Log when objects are filtered out to help
diagnose missing events
+ log.Infof("informerClient.AddEventHandler:
FilterFunc filtered out object %s/%s", nsStr, nameStr)
+ } else {
+ log.Debugf("informerClient.AddEventHandler:
FilterFunc allowing object %s/%s", nsStr, nameStr)
+ }
+ return allowed
+ },
+ Handler: cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ log.Infof("informerClient.AddEventHandler:
OnAdd called for %s/%s", nsStr, nameStr)
+ h.OnAdd(obj, false)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(newObj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ log.Infof("informerClient.AddEventHandler:
OnUpdate called for %s/%s", nsStr, nameStr)
+ h.OnUpdate(oldObj, newObj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ log.Infof("informerClient.AddEventHandler:
OnDelete called for %s/%s", nsStr, nameStr)
+ h.OnDelete(obj)
+ },
},
- Handler: h,
}
n.handlerMu.Lock()
defer n.handlerMu.Unlock()
diff --git a/pkg/kube/kclient/delayed.go b/pkg/kube/kclient/delayed.go
index 4c7b35ef..5278783a 100644
--- a/pkg/kube/kclient/delayed.go
+++ b/pkg/kube/kclient/delayed.go
@@ -18,11 +18,13 @@
package kclient
import (
- "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"sync"
"sync/atomic"
+ "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
+
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/slices"
klabels "k8s.io/apimachinery/pkg/labels"
@@ -89,6 +91,8 @@ func (s *delayedClient[T]) set(inf Informer[T]) {
s.hm.Lock()
defer s.hm.Unlock()
for _, h := range s.handlers {
+ // h is a delayedHandler which embeds
ResourceEventHandler, so we can pass it directly
+ // This matches Istio's implementation
reg := inf.AddEventHandler(h)
h.hasSynced.hasSynced.Store(ptr.Of(reg.HasSynced))
}
@@ -210,3 +214,43 @@ func (d delayedIndex[T]) Lookup(key string) []interface{} {
// Not ready yet, return nil
return nil
}
+
+func newDelayedInformer[T controllers.ComparableObject](
+ gvr schema.GroupVersionResource,
+ getInf func() informerfactory.StartableInformer,
+ delay kubetypes.DelayedFilter,
+ filter Filter,
+) Informer[T] {
+ delayedClient := &delayedClient[T]{
+ inf: new(atomic.Pointer[Informer[T]]),
+ delayed: delay,
+ }
+
+ // If resource is not yet known, we will use the delayedClient.
+ // When the resource is later loaded, the callback will trigger and
swap our dummy delayedClient
+ // with a full client
+ readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
+ // The inf() call is responsible for starting the informer
+ inf := getInf()
+ fc := &informerClient[T]{
+ informer: inf.Informer,
+ startInformer: inf.Start,
+ }
+ applyDynamicFilter(filter, gvr, fc)
+ // Swap out the dummy client with the full one BEFORE starting
the informer
+ // This ensures handlers are registered before the informer
starts syncing
+ delayedClient.set(fc)
+ inf.Start(stop)
+ log.Infof("%v is now ready, building client",
gvr.GroupResource())
+ })
+ if !readyNow {
+ log.Debugf("%v is not ready now, building delayed client",
gvr.GroupResource())
+ return delayedClient
+ }
+ log.Debugf("%v ready now, building client", gvr.GroupResource())
+ // When readyNow is true, we need to ensure the informer is registered
with the factory
+ // so InformerFactory.Start() can pick it up.
+ inf := getInf()
+ fc := newInformerClient[T](gvr, inf, filter)
+ return fc
+}
diff --git a/pkg/kube/krt/informer.go b/pkg/kube/krt/informer.go
index 0ddefe30..120ebaef 100644
--- a/pkg/kube/krt/informer.go
+++ b/pkg/kube/krt/informer.go
@@ -19,6 +19,7 @@ package krt
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"github.com/apache/dubbo-kubernetes/pkg/kube"
@@ -155,7 +156,44 @@ func (i *informer[I]) Register(f func(o Event[I]))
HandlerRegistration {
func (i *informer[I]) RegisterBatch(f func(o []Event[I]), runExistingState
bool) HandlerRegistration {
synced := i.inf.AddEventHandler(informerEventHandler[I](func(o
Event[I], initialSync bool) {
- f([]Event[I]{o})
+ // Only process events if runExistingState is true OR this is
not an initial sync event
+ // This matches Istio's behavior: runExistingState=false means
skip initial sync events
+ if runExistingState || !initialSync {
+ // Log all events to help diagnose missing events
+ var nameStr, nsStr string
+ if o.New != nil {
+ if objWithNs, ok := any(*o.New).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ } else if o.Old != nil {
+ if objWithNs, ok := any(*o.Old).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ }
+ log.Debugf("informer.RegisterBatch: processing event %s
for %s/%s (initialSync=%v, runExistingState=%v)", o.Event, nsStr, nameStr,
initialSync, runExistingState)
+ f([]Event[I]{o})
+ } else {
+ // Log skipped events to help diagnose
+ var nameStr, nsStr string
+ if o.New != nil {
+ if objWithNs, ok := any(*o.New).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ }
+ log.Debugf("informer.RegisterBatch: skipping initial
sync event for %s/%s (initialSync=%v, runExistingState=%v)", nsStr, nameStr,
initialSync, runExistingState)
+ }
}))
base := i.baseSyncer
handler := pollSyncer{
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index b0f60c7d..0e59b309 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -107,10 +107,12 @@ func (d *discoveryNamespacesFilter) Filter(obj any) bool {
// When an object is deleted, obj could be a DeletionFinalStateUnknown
marker item.
ns, ok := extractObjectNamespace(obj)
if !ok {
+ log.Debugf("discoveryNamespacesFilter.Filter: failed to extract
namespace from object, rejecting")
return false
}
if ns == "" {
// Cluster scoped resources. Always included
+ log.Debugf("discoveryNamespacesFilter.Filter: cluster-scoped
resource, allowing")
return true
}
@@ -118,11 +120,18 @@ func (d *discoveryNamespacesFilter) Filter(obj any) bool {
defer d.lock.RUnlock()
// permit all objects if discovery selectors are not specified
if len(d.discoverySelectors) == 0 {
+ log.Debugf("discoveryNamespacesFilter.Filter: no discovery
selectors, allowing namespace %s", ns)
return true
}
// permit if object resides in a namespace labeled for discovery
- return d.discoveryNamespaces.Contains(ns)
+ allowed := d.discoveryNamespaces.Contains(ns)
+ if !allowed {
+ log.Infof("discoveryNamespacesFilter.Filter: namespace %s not
in discoveryNamespaces (selectors=%d, namespaces=%v), rejecting", ns,
len(d.discoverySelectors), d.discoveryNamespaces)
+ } else {
+ log.Debugf("discoveryNamespacesFilter.Filter: namespace %s in
discoveryNamespaces, allowing", ns)
+ }
+ return allowed
}
// AddHandler registers a handler on namespace, which will be triggered when
namespace selected or deselected.
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 5a2de13a..bf5e126b 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -192,6 +192,13 @@ func Receive(ctx ConnectionContext) {
// Connection logged in initConnection() after addCon()
to ensure accurate counting
}
+ resourceNamesStr := " [wildcard]"
+ if len(req.ResourceNames) > 0 {
+ resourceNamesStr = " [" +
strings.Join(req.ResourceNames, ", ") + "]"
+ }
+ log.Infof("%s: RAW REQ %s resources:%d nonce:%s%s",
+ model.GetShortType(req.TypeUrl), con.conID,
len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
+
select {
case con.reqChan <- req:
case <-con.stream.Context().Done():
@@ -316,6 +323,32 @@ func ShouldRespond(w Watcher, id string, request
*discovery.DiscoveryRequest) (b
// A nonce becomes stale following a newer nonce being sent to Envoy.
// previousInfo.NonceSent can be empty if we previously had
shouldRespond=true but didn't send any resources.
if request.ResponseNonce != previousInfo.NonceSent {
+ newResources := sets.New(request.ResourceNames...)
+ // Special-case proxyless gRPC: Envoy will send a "stale" nonce
when it changes
+ // subscriptions (e.g., after ServiceRoute introduces subset
clusters). Treat this
+ // as a resource change rather than an ACK so the new clusters
get a response.
+ previousResourcesCopy := previousInfo.ResourceNames.Copy()
+ if !newResources.Equals(previousResourcesCopy) &&
len(newResources) > 0 {
+ log.Infof("%s: REQ %s nonce mismatch (got %s, sent %s)
but resources changed -> responding",
+ stype, id, request.ResponseNonce,
previousInfo.NonceSent)
+ added := newResources.Difference(previousResourcesCopy)
+ w.UpdateWatchedResource(request.TypeUrl, func(wr
*WatchedResource) *WatchedResource {
+ if wr == nil {
+ return nil
+ }
+ wr.LastError = ""
+ wr.ResourceNames = newResources
+ // keep previous nonce so the subsequent ACK
can match
+ return wr
+ })
+ if len(added) == 0 {
+ // Still respond to make sure client receives
an update even if map difference logic
+ // thinks nothing was added (e.g., only removal
happened).
+ return true, ResourceDelta{Subscribed: added}
+ }
+ return true, ResourceDelta{Subscribed: added}
+ }
+
// Expired/stale nonce - don't respond, just log at debug level
if previousInfo.NonceSent == "" {
// We never sent a nonce, but client sent one - this is
unusual but treat as expired
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index 506b6405..cb3bef55 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -32,13 +32,76 @@ spec:
apiVersion: apps/v1
kind: Deployment
metadata:
- name: consumer
+ name: consumer-v1
+ namespace: grpc-app
+spec:
+ replicas: 2
+ selector:
+ matchLabels:
+ app: consumer
+ version: v1
+ template:
+ metadata:
+ annotations:
+ proxyless.dubbo.apache.org/inject: "true"
+ inject.dubbo.apache.org/templates: grpc-agent
+ proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts":
true}'
+ labels:
+ app: consumer
+ version: v1
+ spec:
+ containers:
+ - name: app
+ image: mfordjody/grpc-consumer:dev-debug
+ imagePullPolicy: Always
+ ports:
+ - containerPort: 17070
+ protocol: TCP
+ name: grpc
+ env:
+ - name: INSTANCE_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ - name: SERVICE_VERSION
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.labels['version']
+ - name: SERVICE_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: SERVICE_PORT
+ value: "17070"
+ readinessProbe:
+ tcpSocket:
+ port: 17070
+ initialDelaySeconds: 5
+ periodSeconds: 5
+ timeoutSeconds: 2
+ successThreshold: 1
+ failureThreshold: 3
+ livenessProbe:
+ tcpSocket:
+ port: 17070
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ timeoutSeconds: 2
+ successThreshold: 1
+ failureThreshold: 3
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: consumer-v2
namespace: grpc-app
spec:
replicas: 2
selector:
matchLabels:
app: consumer
+ version: v2
template:
metadata:
annotations:
@@ -47,6 +110,7 @@ spec:
proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts":
true}'
labels:
app: consumer
+ version: v2
spec:
containers:
- name: app
@@ -62,6 +126,16 @@ spec:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
+ - name: SERVICE_VERSION
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.labels['version']
+ - name: SERVICE_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: SERVICE_PORT
+ value: "17070"
readinessProbe:
tcpSocket:
port: 17070
diff --git a/tests/grpc-app/consumer/main.go b/tests/grpc-app/consumer/main.go
index 8e850edd..89da67e9 100644
--- a/tests/grpc-app/consumer/main.go
+++ b/tests/grpc-app/consumer/main.go
@@ -26,6 +26,7 @@ import (
"os"
"os/signal"
"regexp"
+ "strconv"
"strings"
"syscall"
"time"
@@ -47,7 +48,12 @@ var (
type echoServer struct {
pb.UnimplementedEchoServiceServer
pb.UnimplementedEchoTestServiceServer
- hostname string
+ hostname string
+ serviceVersion string
+ namespace string
+ instanceIP string
+ cluster string
+ servicePort int
}
func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest)
(*pb.EchoResponse, error) {
@@ -56,8 +62,13 @@ func (s *echoServer) Echo(ctx context.Context, req
*pb.EchoRequest) (*pb.EchoRes
}
log.Printf("Received: %v", req.Message)
return &pb.EchoResponse{
- Message: req.Message,
- Hostname: s.hostname,
+ Message: req.Message,
+ Hostname: s.hostname,
+ ServiceVersion: s.serviceVersion,
+ Namespace: s.namespace,
+ Ip: s.instanceIP,
+ Cluster: s.cluster,
+ ServicePort: int32(s.servicePort),
}, nil
}
@@ -98,7 +109,14 @@ func (s *echoServer) ForwardEcho(ctx context.Context, req
*pb.ForwardEchoRequest
output := make([]string, 0, count)
for i := int32(0); i < count; i++ {
- line := fmt.Sprintf("[%d body] Hostname=%s", i, s.hostname)
+ line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s
ServicePort=%d Namespace=%s",
+ i, s.hostname, s.serviceVersion, s.servicePort,
s.namespace)
+ if s.instanceIP != "" {
+ line += fmt.Sprintf(" IP=%s", s.instanceIP)
+ }
+ if s.cluster != "" {
+ line += fmt.Sprintf(" Cluster=%s", s.cluster)
+ }
output = append(output, line)
}
@@ -274,6 +292,15 @@ func waitForBootstrapFile(bootstrapPath string, maxWait
time.Duration) error {
}
}
+func firstNonEmpty(values ...string) string {
+ for _, v := range values {
+ if strings.TrimSpace(v) != "" {
+ return v
+ }
+ }
+ return ""
+}
+
func main() {
flag.Parse()
@@ -288,6 +315,24 @@ func main() {
hostname = "unknown"
}
+ namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"),
os.Getenv("POD_NAMESPACE"), "default")
+ serviceVersion := firstNonEmpty(
+ os.Getenv("SERVICE_VERSION"),
+ os.Getenv("POD_VERSION"),
+ os.Getenv("VERSION"),
+ )
+ if serviceVersion == "" {
+ serviceVersion = "unknown"
+ }
+ cluster := os.Getenv("SERVICE_CLUSTER")
+ instanceIP := os.Getenv("INSTANCE_IP")
+ servicePort := *port
+ if sp := os.Getenv("SERVICE_PORT"); sp != "" {
+ if parsed, err := strconv.Atoi(sp); err == nil {
+ servicePort = parsed
+ }
+ }
+
// Get bootstrap file path from environment variable or use default
bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
if bootstrapPath == "" {
@@ -315,7 +360,14 @@ func main() {
log.Fatalf("Failed to create xDS gRPC server: %v", err)
}
- es := &echoServer{hostname: hostname}
+ es := &echoServer{
+ hostname: hostname,
+ serviceVersion: serviceVersion,
+ namespace: namespace,
+ instanceIP: instanceIP,
+ cluster: cluster,
+ servicePort: servicePort,
+ }
pb.RegisterEchoServiceServer(server, es)
pb.RegisterEchoTestServiceServer(server, es)
// Enable reflection API for grpcurl to discover services
diff --git a/tests/grpc-app/producer/main.go b/tests/grpc-app/producer/main.go
index 9916f44d..8bb23199 100644
--- a/tests/grpc-app/producer/main.go
+++ b/tests/grpc-app/producer/main.go
@@ -525,8 +525,29 @@ func (s *testServerImpl) ForwardEcho(ctx context.Context,
req *pb.ForwardEchoReq
continue
}
- log.Printf("ForwardEcho: request %d succeeded: Hostname=%s",
i+1, resp.Hostname)
- output = append(output, fmt.Sprintf("[%d body] Hostname=%s", i,
resp.Hostname))
+ log.Printf("ForwardEcho: request %d succeeded: Hostname=%s
ServiceVersion=%s Namespace=%s IP=%s",
+ i+1, resp.Hostname, resp.ServiceVersion,
resp.Namespace, resp.Ip)
+
+ lineParts := []string{
+ fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
+ }
+ if resp.ServiceVersion != "" {
+ lineParts = append(lineParts,
fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
+ }
+ if resp.Namespace != "" {
+ lineParts = append(lineParts,
fmt.Sprintf("Namespace=%s", resp.Namespace))
+ }
+ if resp.Ip != "" {
+ lineParts = append(lineParts, fmt.Sprintf("IP=%s",
resp.Ip))
+ }
+ if resp.Cluster != "" {
+ lineParts = append(lineParts, fmt.Sprintf("Cluster=%s",
resp.Cluster))
+ }
+ if resp.ServicePort > 0 {
+ lineParts = append(lineParts,
fmt.Sprintf("ServicePort=%d", resp.ServicePort))
+ }
+
+ output = append(output, strings.Join(lineParts, " "))
// Small delay between successful requests to avoid
overwhelming the server
if i < count-1 {
diff --git a/tests/grpc-app/proto/echo.proto b/tests/grpc-app/proto/echo.proto
index c0bd61e5..1d7c32e6 100644
--- a/tests/grpc-app/proto/echo.proto
+++ b/tests/grpc-app/proto/echo.proto
@@ -41,6 +41,11 @@ message EchoRequest {
message EchoResponse {
string message = 1;
string hostname = 2;
+ string service_version = 3;
+ string namespace = 4;
+ string ip = 5;
+ string cluster = 6;
+ int32 service_port = 7;
}
message ForwardEchoRequest {