This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new e51536d9 fix krt and xds api (#822)
e51536d9 is described below

commit e51536d9ec888ed58e60b7b9ddac0dd757f7763e
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Nov 18 21:44:35 2025 +0800

    fix krt and xds api (#822)
    
    * fix krt and xds api
    
    * fix ci
    
    * fix ci
---
 dubbod/planet/pkg/bootstrap/server.go              |  89 ++++++-
 dubbod/planet/pkg/bootstrap/validation.go          |   2 +
 dubbod/planet/pkg/config/aggregate/config.go       |  11 +-
 dubbod/planet/pkg/config/kube/crdclient/client.go  | 161 ++++++++++--
 dubbod/planet/pkg/config/kube/crdclient/types.go   | 275 +++++++++++++++++++--
 dubbod/planet/pkg/model/push_context.go            | 196 ++++++++++++++-
 dubbod/planet/pkg/model/serviceroute.go            |  34 ++-
 dubbod/planet/pkg/model/subsetrule.go              |  17 ++
 dubbod/planet/pkg/networking/grpcgen/cds.go        |  77 +++++-
 dubbod/planet/pkg/networking/grpcgen/rds.go        | 147 +++++++++--
 dubbod/planet/pkg/xds/ads.go                       |  10 +-
 dubbod/planet/pkg/xds/delta.go                     |  23 +-
 .../planet/pkg/xds/endpoints/endpoint_builder.go   |  14 +-
 dubbod/planet/pkg/xds/xdsgen.go                    |  50 ++++
 manifests/charts/base/files/crd-all.yaml           |   4 +-
 .../dubbo-discovery/templates/clusterrole.yaml     |   3 +
 .../templates/validatingwebhookconfiguration.yaml  |  37 ++-
 pkg/config/schema/collections/collections.agent.go |   6 +-
 pkg/config/schema/collections/collections.go       |   6 +-
 pkg/config/schema/gvk/resources.go                 |   2 +-
 pkg/config/schema/kind/resources.go                |   2 +-
 pkg/config/schema/kubeclient/resources.go          |  67 ++++-
 pkg/kube/kclient/client.go                         | 138 ++++++++---
 pkg/kube/kclient/delayed.go                        |  46 +++-
 pkg/kube/krt/informer.go                           |  40 ++-
 pkg/kube/namespace/filter.go                       |  11 +-
 pkg/xds/server.go                                  |  33 +++
 samples/grpc-app/grpc-app.yaml                     |  76 +++++-
 tests/grpc-app/consumer/main.go                    |  62 ++++-
 tests/grpc-app/producer/main.go                    |  25 +-
 tests/grpc-app/proto/echo.proto                    |   5 +
 31 files changed, 1499 insertions(+), 170 deletions(-)

diff --git a/dubbod/planet/pkg/bootstrap/server.go 
b/dubbod/planet/pkg/bootstrap/server.go
index e2976797..aa363d4f 100644
--- a/dubbod/planet/pkg/bootstrap/server.go
+++ b/dubbod/planet/pkg/bootstrap/server.go
@@ -47,6 +47,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/config/constants"
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/pkg/ctrlz"
        "github.com/apache/dubbo-kubernetes/pkg/filewatcher"
        "github.com/apache/dubbo-kubernetes/pkg/h2c"
@@ -252,8 +253,7 @@ func NewServer(args *PlanetArgs, initFuncs 
...func(*Server)) (*Server, error) {
                }
        }
 
-       s.initRegistryEventHandlers()
-
+       // Note: initRegistryEventHandlers is called in Start() after config 
controller starts
        s.initDiscoveryService()
 
        s.startCA(caOpts)
@@ -284,6 +284,10 @@ func (s *Server) Start(stop <-chan struct{}) error {
 
        s.XDSServer.CachesSynced()
 
+       // Register event handlers after config controller has started and 
synced
+       // This ensures that config changes are properly detected and handled
+       s.initRegistryEventHandlers()
+
        if s.secureGrpcAddress != "" {
                grpcListener, err := net.Listen("tcp", s.secureGrpcAddress)
                if err != nil {
@@ -424,13 +428,84 @@ func (s *Server) initSecureDiscoveryService(args 
*PlanetArgs, trustDomain string
 func (s *Server) initRegistryEventHandlers() {
        log.Info("initializing registry event handlers")
 
-       if s.configController != nil {
-               configHandler := func(prev config.Config, curr config.Config, 
event model.Event) {}
-               schemas := collections.Planet.All()
-               for _, schema := range schemas {
-                       
s.configController.RegisterEventHandler(schema.GroupVersionKind(), 
configHandler)
+       if s.configController == nil {
+               log.Warnf("initRegistryEventHandlers: configController is nil, 
cannot register event handlers")
+               return
+       }
+
+       log.Infof("initRegistryEventHandlers: configController is available, 
registering event handlers")
+
+       configHandler := func(prev config.Config, curr config.Config, event 
model.Event) {
+               // Log ALL events at INFO level to ensure visibility
+               log.Infof("configHandler: received event %s for config %v 
(prev.Name=%s, curr.Name=%s, prev.Namespace=%s, curr.Namespace=%s)",
+                       event, curr.GroupVersionKind, prev.Name, curr.Name, 
prev.Namespace, curr.Namespace)
+
+               // Handle delete events - use prev config if curr is empty
+               cfg := curr
+               if event == model.EventDelete && curr.Name == "" {
+                       cfg = prev
+               }
+
+               // Build ConfigKey for the changed config
+               // Find the schema to get the kind.Kind
+               schema, found := 
collections.Planet.FindByGroupVersionKind(cfg.GroupVersionKind)
+               if !found {
+                       log.Warnf("configHandler: schema not found for %v, 
skipping", cfg.GroupVersionKind)
+                       return
+               }
+
+               // Map GVK to kind.Kind using schema identifier
+               // This matches Istio's approach of using gvk.MustToKind, but 
we use schema.Identifier() instead
+               schemaID := schema.Identifier()
+               log.Infof("configHandler: processing config change, schema 
identifier=%s, GVK=%v, name=%s/%s, event=%s",
+                       schemaID, cfg.GroupVersionKind, cfg.Namespace, 
cfg.Name, event)
+
+               var configKind kind.Kind
+               switch schemaID {
+               case "SubsetRule":
+                       configKind = kind.SubsetRule
+               case "serviceRoute", "ServiceRoute":
+                       configKind = kind.ServiceRoute
+               case "PeerAuthentication":
+                       configKind = kind.PeerAuthentication
+               default:
+                       log.Debugf("configHandler: unknown schema identifier %s 
for %v, skipping", schemaID, cfg.GroupVersionKind)
+                       return
+               }
+
+               configKey := model.ConfigKey{
+                       Kind:      configKind,
+                       Name:      cfg.Name,
+                       Namespace: cfg.Namespace,
                }
+
+               // Log the config change
+               log.Infof("configHandler: %s event for %s/%s/%s", event, 
configKey.Kind, configKey.Namespace, configKey.Name)
+
+               // CRITICAL: For SubsetRule and ServiceRoute changes, we need 
Full push to ensure
+               // PushContext is re-initialized and configuration is reloaded
+               // This is because these configs affect CDS/RDS generation and 
need complete context refresh
+               needsFullPush := configKind == kind.SubsetRule || configKind == 
kind.ServiceRoute
+
+               // Trigger ConfigUpdate to push changes to all connected proxies
+               s.XDSServer.ConfigUpdate(&model.PushRequest{
+                       ConfigsUpdated: sets.New(configKey),
+                       Reason:         
model.NewReasonStats(model.DependentResource),
+                       Full:           needsFullPush, // Full push for 
SubsetRule/ServiceRoute to reload PushContext
+               })
        }
+       schemas := collections.Planet.All()
+       log.Infof("initRegistryEventHandlers: found %d schemas to register", 
len(schemas))
+       registeredCount := 0
+       for _, schema := range schemas {
+               gvk := schema.GroupVersionKind()
+               schemaID := schema.Identifier()
+               log.Infof("initRegistryEventHandlers: registering event handler 
for %s (GVK: %v)", schemaID, gvk)
+               s.configController.RegisterEventHandler(gvk, configHandler)
+               registeredCount++
+               log.Infof("initRegistryEventHandlers: successfully registered 
event handler for %s (GVK: %v)", schemaID, gvk)
+       }
+       log.Infof("initRegistryEventHandlers: successfully registered event 
handlers for %d schemas", registeredCount)
 }
 
 func (s *Server) addReadinessProbe(name string, fn readinessProbe) {
diff --git a/dubbod/planet/pkg/bootstrap/validation.go 
b/dubbod/planet/pkg/bootstrap/validation.go
index 6c53b01f..05a7df9d 100644
--- a/dubbod/planet/pkg/bootstrap/validation.go
+++ b/dubbod/planet/pkg/bootstrap/validation.go
@@ -19,6 +19,7 @@ package bootstrap
 
 import (
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
        "github.com/apache/dubbo-kubernetes/pkg/log"
        "github.com/apache/dubbo-kubernetes/pkg/webhooks/server"
        "github.com/apache/dubbo-kubernetes/pkg/webhooks/validation/controller"
@@ -30,6 +31,7 @@ func (s *Server) initConfigValidation(args *PlanetArgs) error 
{
        }
        log.Info("initializing config validator")
        params := server.Options{
+               Schemas:      collections.Planet,
                DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
                Mux:          s.httpsMux,
        }
diff --git a/dubbod/planet/pkg/config/aggregate/config.go 
b/dubbod/planet/pkg/config/aggregate/config.go
index cc64232a..f90702a8 100644
--- a/dubbod/planet/pkg/config/aggregate/config.go
+++ b/dubbod/planet/pkg/config/aggregate/config.go
@@ -28,6 +28,7 @@ import (
 
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/log"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
 )
 
@@ -158,11 +159,19 @@ func makeStore(stores []model.ConfigStore, writer 
model.ConfigStore) (model.Conf
 }
 
 func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, 
handler model.EventHandler) {
-       for _, cache := range cr.caches {
+       log := log.RegisterScope("aggregate", "aggregate config controller")
+       log.Infof("RegisterEventHandler: registering handler for %v across %d 
caches", kind, len(cr.caches))
+       registeredCount := 0
+       for i, cache := range cr.caches {
                if _, exists := cache.Schemas().FindByGroupVersionKind(kind); 
exists {
+                       log.Infof("RegisterEventHandler: registering handler 
for %v on cache[%d] (type=%T)", kind, i, cache)
                        cache.RegisterEventHandler(kind, handler)
+                       registeredCount++
+               } else {
+                       log.Debugf("RegisterEventHandler: cache[%d] does not 
support %v, skipping", i, kind)
                }
        }
+       log.Infof("RegisterEventHandler: successfully registered handler for %v 
on %d caches", kind, registeredCount)
 }
 
 func (cr *storeCache) Run(stop <-chan struct{}) {
diff --git a/dubbod/planet/pkg/config/kube/crdclient/client.go 
b/dubbod/planet/pkg/config/kube/crdclient/client.go
index c251efab..e4cab973 100644
--- a/dubbod/planet/pkg/config/kube/crdclient/client.go
+++ b/dubbod/planet/pkg/config/kube/crdclient/client.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
        "github.com/apache/dubbo-kubernetes/pkg/kube"
        "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
@@ -152,19 +153,20 @@ func (cl *Client) allKinds() 
map[config.GroupVersionKind]nsStore {
 }
 
 func (cl *Client) addCRD(name string, opts krt.OptionsBuilder) {
-       cl.logger.Debugf("adding CRD %q", name)
+       cl.logger.Infof("addCRD: adding CRD %q", name)
        s, f := cl.schemasByCRDName[name]
        if !f {
-               cl.logger.Debugf("added resource that we are not watching: %v", 
name)
+               cl.logger.Warnf("addCRD: added resource that we are not 
watching: %v", name)
                return
        }
        resourceGVK := s.GroupVersionKind()
        gvr := s.GroupVersionResource()
+       cl.logger.Infof("addCRD: CRD %q maps to GVK %v, GVR %v", name, 
resourceGVK, gvr)
 
        cl.kindsMu.Lock()
        defer cl.kindsMu.Unlock()
        if _, f := cl.kinds[resourceGVK]; f {
-               cl.logger.Debugf("added resource that already exists: %v", 
resourceGVK)
+               cl.logger.Warnf("addCRD: added resource that already exists: 
%v", resourceGVK)
                return
        }
        translateFunc, f := translationMap[resourceGVK]
@@ -189,6 +191,9 @@ func (cl *Client) addCRD(name string, opts 
krt.OptionsBuilder) {
        var namespaceFilter kubetypes.DynamicObjectFilter
        if !s.IsClusterScoped() {
                namespaceFilter = cl.client.ObjectFilter()
+               cl.logger.Infof("addCRD: using namespace filter for %v (not 
cluster-scoped)", resourceGVK)
+       } else {
+               cl.logger.Infof("addCRD: no namespace filter for %v 
(cluster-scoped)", resourceGVK)
        }
 
        filter := kubetypes.Filter{
@@ -196,15 +201,23 @@ func (cl *Client) addCRD(name string, opts 
krt.OptionsBuilder) {
                ObjectTransform: transform,
                FieldSelector:   fieldSelector,
        }
+       cl.logger.Infof("addCRD: created filter for %v (namespaceFilter=%v, 
extraFilter=%v, fieldSelector=%v)", resourceGVK, namespaceFilter != nil, 
extraFilter != nil, fieldSelector)
 
        var kc kclient.Untyped
        if s.IsBuiltin() {
                kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
        } else {
+               // For SubsetRule and ServiceRoute, we use Dynamic client which 
returns unstructured objects
+               // So we need to use DynamicInformer type to ensure the 
informer expects unstructured objects
+               informerType := kubetypes.StandardInformer
+               if resourceGVK == gvk.SubsetRule || resourceGVK == 
gvk.ServiceRoute || resourceGVK == gvk.PeerAuthentication {
+                       informerType = kubetypes.DynamicInformer
+                       cl.logger.Infof("addCRD: using DynamicInformer for %v 
(uses Dynamic client)", resourceGVK)
+               }
                kc = kclient.NewDelayedInformer[controllers.Object](
                        cl.client,
                        gvr,
-                       kubetypes.StandardInformer,
+                       informerType,
                        filter,
                )
        }
@@ -213,15 +226,58 @@ func (cl *Client) addCRD(name string, opts 
krt.OptionsBuilder) {
        collection := krt.MapCollection(wrappedClient, func(obj 
controllers.Object) config.Config {
                cfg := translateFunc(obj)
                cfg.Domain = cl.domainSuffix
+               // Only log at Debug level to avoid spam, but keep it available 
for diagnosis
+               cl.logger.Debugf("addCRD: MapCollection translating object 
%s/%s to config for %v", obj.GetNamespace(), obj.GetName(), resourceGVK)
                return cfg
        }, opts.WithName("collection/"+resourceGVK.Kind)...)
        index := krt.NewNamespaceIndex(collection)
+       // Register a debug handler to track all events from the wrappedClient 
(before MapCollection)
+       // This helps diagnose if events are being filtered before reaching the 
collection
+       wrappedClientDebugHandler := wrappedClient.RegisterBatch(func(o 
[]krt.Event[controllers.Object]) {
+               if len(o) > 0 {
+                       cl.logger.Infof("addCRD: wrappedClient event detected 
for %v: %d events", resourceGVK, len(o))
+                       for i, event := range o {
+                               var nameStr, nsStr string
+                               if event.New != nil {
+                                       obj := *event.New
+                                       nameStr = obj.GetName()
+                                       nsStr = obj.GetNamespace()
+                               } else if event.Old != nil {
+                                       obj := *event.Old
+                                       nameStr = obj.GetName()
+                                       nsStr = obj.GetNamespace()
+                               }
+                               cl.logger.Infof("addCRD: wrappedClient 
event[%d] %s for %v (name=%s/%s)",
+                                       i, event.Event, resourceGVK, nsStr, 
nameStr)
+                       }
+               }
+       }, false)
+       // Register a debug handler to track all events from the collection
+       // This helps diagnose why new config changes might not trigger events
+       // Use false to match Istio's implementation - only process future 
events, not initial sync
+       debugHandler := collection.RegisterBatch(func(o 
[]krt.Event[config.Config]) {
+               if len(o) > 0 {
+                       cl.logger.Infof("addCRD: collection event detected for 
%v: %d events", resourceGVK, len(o))
+                       for i, event := range o {
+                               var nameStr, nsStr string
+                               if event.New != nil {
+                                       nameStr = event.New.Name
+                                       nsStr = event.New.Namespace
+                               } else if event.Old != nil {
+                                       nameStr = event.Old.Name
+                                       nsStr = event.Old.Namespace
+                               }
+                               cl.logger.Infof("addCRD: collection event[%d] 
%s for %v (name=%s/%s)",
+                                       i, event.Event, resourceGVK, nsStr, 
nameStr)
+                       }
+               }
+       }, false)
        cl.kinds[resourceGVK] = nsStore{
                collection: collection,
                index:      index,
                handlers: []krt.HandlerRegistration{
-                       collection.RegisterBatch(func(o 
[]krt.Event[config.Config]) {
-                       }, false),
+                       wrappedClientDebugHandler,
+                       debugHandler,
                },
        }
 }
@@ -238,23 +294,59 @@ func (cl *Client) Schemas() collection.Schemas {
 }
 
 func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler 
model.EventHandler) {
-       if c, ok := cl.kind(kind); ok {
-               c.handlers = append(c.handlers, 
c.collection.RegisterBatch(func(o []krt.Event[config.Config]) {
-                       for _, event := range o {
-                               switch event.Event {
-                               case controllers.EventAdd:
+       cl.kindsMu.Lock()
+       defer cl.kindsMu.Unlock()
+
+       c, ok := cl.kinds[kind]
+       if !ok {
+               cl.logger.Warnf("unknown type: %s", kind)
+               return
+       }
+
+       cl.logger.Infof("RegisterEventHandler: registering handler for %v", 
kind)
+       // Match Istio's implementation: RegisterBatch returns a 
HandlerRegistration that is already
+       // registered with the collection, so we just need to append it to 
handlers to keep a reference
+       // The handler will be called by the collection when events occur, 
regardless of whether we
+       // update cl.kinds[kind] or not. However, we update it to keep the 
handlers slice in sync.
+       handlerReg := c.collection.RegisterBatch(func(o 
[]krt.Event[config.Config]) {
+               cl.logger.Infof("RegisterEventHandler: batch handler triggered 
for %v with %d events", kind, len(o))
+               for i, event := range o {
+                       var nameStr, nsStr string
+                       if event.New != nil {
+                               nameStr = event.New.Name
+                               nsStr = event.New.Namespace
+                       } else if event.Old != nil {
+                               nameStr = event.Old.Name
+                               nsStr = event.Old.Namespace
+                       }
+                       cl.logger.Infof("RegisterEventHandler: processing 
event[%d] %s for %v (name=%s/%s)",
+                               i, event.Event, kind, nsStr, nameStr)
+                       switch event.Event {
+                       case controllers.EventAdd:
+                               if event.New != nil {
                                        handler(config.Config{}, *event.New, 
model.Event(event.Event))
-                               case controllers.EventUpdate:
+                               } else {
+                                       cl.logger.Warnf("RegisterEventHandler: 
EventAdd but event.New is nil, skipping")
+                               }
+                       case controllers.EventUpdate:
+                               if event.Old != nil && event.New != nil {
                                        handler(*event.Old, *event.New, 
model.Event(event.Event))
-                               case controllers.EventDelete:
+                               } else {
+                                       cl.logger.Warnf("RegisterEventHandler: 
EventUpdate but event.Old or event.New is nil, skipping")
+                               }
+                       case controllers.EventDelete:
+                               if event.Old != nil {
                                        handler(config.Config{}, *event.Old, 
model.Event(event.Event))
+                               } else {
+                                       cl.logger.Warnf("RegisterEventHandler: 
EventDelete but event.Old is nil, skipping")
                                }
                        }
-               }, false))
-               return
-       }
-
-       cl.logger.Warnf("unknown type: %s", kind)
+               }
+       }, false)
+       // Update handlers slice to keep reference (though not strictly 
necessary for functionality)
+       c.handlers = append(c.handlers, handlerReg)
+       cl.kinds[kind] = c
+       cl.logger.Infof("RegisterEventHandler: successfully registered handler 
for %v", kind)
 }
 
 func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string) 
*config.Config {
@@ -333,14 +425,43 @@ func (cl *Client) Delete(typ config.GroupVersionKind, 
name, namespace string, re
 func (cl *Client) List(kind config.GroupVersionKind, namespace string) 
[]config.Config {
        h, f := cl.kind(kind)
        if !f {
+               cl.logger.Warnf("List: unknown kind %v", kind)
                return nil
        }
 
+       // Check if collection is synced
+       if !h.collection.HasSynced() {
+               cl.logger.Warnf("List: collection for %v is not synced yet", 
kind)
+       }
+
+       var configs []config.Config
        if namespace == metav1.NamespaceAll {
-               return h.collection.List()
+               // Get all configs from collection
+               configs = h.collection.List()
+               cl.logger.Infof("List: found %d configs for %v (namespace=all, 
synced=%v)",
+                       len(configs), kind, h.collection.HasSynced())
+               if len(configs) > 0 {
+                       for i, cfg := range configs {
+                               cl.logger.Infof("List: config[%d] %s/%s for 
%v", i, cfg.Namespace, cfg.Name, kind)
+                       }
+               } else {
+                       cl.logger.Warnf("List: collection returned 0 configs 
for %v (synced=%v), this may indicate informer is not watching correctly or 
resources are being filtered", kind, h.collection.HasSynced())
+               }
+               // Log collection type for diagnosis
+               cl.logger.Infof("List: collection type is %T, HasSynced=%v", 
h.collection, h.collection.HasSynced())
+       } else {
+               configs = h.index.Lookup(namespace)
+               cl.logger.Infof("List: found %d configs for %v in namespace %s 
(synced=%v)", len(configs), kind, namespace, h.collection.HasSynced())
+               if len(configs) > 0 {
+                       for i, cfg := range configs {
+                               cl.logger.Infof("List: config[%d] %s/%s for 
%v", i, cfg.Namespace, cfg.Name, kind)
+                       }
+               } else {
+                       cl.logger.Warnf("List: found 0 configs for %v in 
namespace %s (synced=%v), checking if resources exist in cluster", kind, 
namespace, h.collection.HasSynced())
+               }
        }
 
-       return h.index.Lookup(namespace)
+       return configs
 }
 
 func getObjectMetadata(config config.Config) metav1.ObjectMeta {
diff --git a/dubbod/planet/pkg/config/kube/crdclient/types.go 
b/dubbod/planet/pkg/config/kube/crdclient/types.go
index 9e70c241..c5d62fa9 100644
--- a/dubbod/planet/pkg/config/kube/crdclient/types.go
+++ b/dubbod/planet/pkg/config/kube/crdclient/types.go
@@ -37,7 +37,9 @@ import (
        k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
        k8sioapiextensionsapiserverpkgapisapiextensionsv1 
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/apimachinery/pkg/types"
 )
 
@@ -53,13 +55,30 @@ func assignSpec[T any](dst *T, src *T) {
 func create(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta) 
(metav1.Object, error) {
        switch cfg.GroupVersionKind {
        case gvk.SubsetRule:
+               // SubsetRule uses networking.dubbo.apache.org API group, not 
networking.istio.io
+               // Use Dynamic client to access it, but reuse Istio's 
DestinationRule spec structure
                spec := cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
                clonedSpec := protomarshal.Clone(spec)
                obj := &apiistioioapinetworkingv1.DestinationRule{
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Spec, clonedSpec)
-               return 
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Create(context.TODO(), 
obj, metav1.CreateOptions{})
+               // Convert to unstructured for Dynamic client
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
DestinationRule to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "networking.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "SubsetRule",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "subsetrules",
+               }).Namespace(cfg.Namespace).Create(context.TODO(), u, 
metav1.CreateOptions{})
        case gvk.PeerAuthentication:
                spec := cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
                clonedSpec := protomarshal.Clone(spec)
@@ -67,15 +86,46 @@ func create(c kube.Client, cfg config.Config, objMeta 
metav1.ObjectMeta) (metav1
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Spec, clonedSpec)
-               return 
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Create(context.TODO(),
 obj, metav1.CreateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
PeerAuthentication to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "security.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "PeerAuthentication",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "security.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "peerauthentications",
+               }).Namespace(cfg.Namespace).Create(context.TODO(), u, 
metav1.CreateOptions{})
        case gvk.ServiceRoute:
+               // ServiceRoute uses networking.dubbo.apache.org API group, not 
networking.istio.io
+               // Use Dynamic client to access it, but reuse Istio's 
VirtualService spec structure
                spec := cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
                clonedSpec := protomarshal.Clone(spec)
                obj := &apiistioioapinetworkingv1.VirtualService{
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Spec, clonedSpec)
-               return 
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Create(context.TODO(), 
obj, metav1.CreateOptions{})
+               // Convert to unstructured for Dynamic client
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
VirtualService to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "networking.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "ServiceRoute",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "serviceroutes",
+               }).Namespace(cfg.Namespace).Create(context.TODO(), u, 
metav1.CreateOptions{})
        default:
                return nil, fmt.Errorf("unsupported type: %v", 
cfg.GroupVersionKind)
        }
@@ -84,13 +134,28 @@ func create(c kube.Client, cfg config.Config, objMeta 
metav1.ObjectMeta) (metav1
 func update(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta) 
(metav1.Object, error) {
        switch cfg.GroupVersionKind {
        case gvk.SubsetRule:
+               // SubsetRule uses networking.dubbo.apache.org API group, use 
Dynamic client
                spec := cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
                clonedSpec := protomarshal.Clone(spec)
                obj := &apiistioioapinetworkingv1.DestinationRule{
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Spec, clonedSpec)
-               return 
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Update(context.TODO(), 
obj, metav1.UpdateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
DestinationRule to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "networking.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "SubsetRule",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "subsetrules",
+               }).Namespace(cfg.Namespace).Update(context.TODO(), u, 
metav1.UpdateOptions{})
        case gvk.PeerAuthentication:
                spec := cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
                clonedSpec := protomarshal.Clone(spec)
@@ -98,15 +163,44 @@ func update(c kube.Client, cfg config.Config, objMeta 
metav1.ObjectMeta) (metav1
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Spec, clonedSpec)
-               return 
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Update(context.TODO(),
 obj, metav1.UpdateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
PeerAuthentication to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "security.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "PeerAuthentication",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "security.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "peerauthentications",
+               }).Namespace(cfg.Namespace).Update(context.TODO(), u, 
metav1.UpdateOptions{})
        case gvk.ServiceRoute:
+               // ServiceRoute uses networking.dubbo.apache.org API group, use 
Dynamic client
                spec := cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
                clonedSpec := protomarshal.Clone(spec)
                obj := &apiistioioapinetworkingv1.VirtualService{
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Spec, clonedSpec)
-               return 
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Update(context.TODO(), 
obj, metav1.UpdateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
VirtualService to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "networking.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "ServiceRoute",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "serviceroutes",
+               }).Namespace(cfg.Namespace).Update(context.TODO(), u, 
metav1.UpdateOptions{})
        default:
                return nil, fmt.Errorf("unsupported type: %v", 
cfg.GroupVersionKind)
        }
@@ -115,13 +209,28 @@ func update(c kube.Client, cfg config.Config, objMeta 
metav1.ObjectMeta) (metav1
 func updateStatus(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta) 
(metav1.Object, error) {
        switch cfg.GroupVersionKind {
        case gvk.SubsetRule:
+               // SubsetRule uses networking.dubbo.apache.org API group, use 
Dynamic client
                status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
                clonedStatus := protomarshal.Clone(status)
                obj := &apiistioioapinetworkingv1.DestinationRule{
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Status, clonedStatus)
-               return 
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).UpdateStatus(context.TODO(),
 obj, metav1.UpdateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
DestinationRule to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "networking.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "SubsetRule",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "subsetrules",
+               }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u, 
metav1.UpdateOptions{})
        case gvk.PeerAuthentication:
                status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
                clonedStatus := protomarshal.Clone(status)
@@ -129,15 +238,44 @@ func updateStatus(c kube.Client, cfg config.Config, 
objMeta metav1.ObjectMeta) (
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Status, clonedStatus)
-               return 
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).UpdateStatus(context.TODO(),
 obj, metav1.UpdateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
PeerAuthentication status to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "security.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "PeerAuthentication",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "security.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "peerauthentications",
+               }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u, 
metav1.UpdateOptions{})
        case gvk.ServiceRoute:
+               // ServiceRoute uses networking.dubbo.apache.org API group, use 
Dynamic client
                status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
                clonedStatus := protomarshal.Clone(status)
                obj := &apiistioioapinetworkingv1.VirtualService{
                        ObjectMeta: objMeta,
                }
                assignSpec(&obj.Status, clonedStatus)
-               return 
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).UpdateStatus(context.TODO(),
 obj, metav1.UpdateOptions{})
+               uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to convert 
VirtualService to unstructured: %v", err)
+               }
+               u := &unstructured.Unstructured{Object: uObj}
+               u.SetGroupVersionKind(schema.GroupVersionKind{
+                       Group:   "networking.dubbo.apache.org",
+                       Version: "v1",
+                       Kind:    "ServiceRoute",
+               })
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "serviceroutes",
+               }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u, 
metav1.UpdateOptions{})
        default:
                return nil, fmt.Errorf("unsupported type: %v", 
cfg.GroupVersionKind)
        }
@@ -149,6 +287,7 @@ func patch(c kube.Client, orig config.Config, origMeta 
metav1.ObjectMeta, mod co
        }
        switch orig.GroupVersionKind {
        case gvk.SubsetRule:
+               // SubsetRule uses networking.dubbo.apache.org API group, use 
Dynamic client
                origSpec := 
orig.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
                modSpec := 
mod.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
                clonedOrigSpec := protomarshal.Clone(origSpec)
@@ -165,8 +304,11 @@ func patch(c kube.Client, orig config.Config, origMeta 
metav1.ObjectMeta, mod co
                if err != nil {
                        return nil, err
                }
-               return 
c.Dubbo().NetworkingV1().DestinationRules(orig.Namespace).
-                       Patch(context.TODO(), orig.Name, typ, patchBytes, 
metav1.PatchOptions{FieldManager: "planet-discovery"})
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "subsetrules",
+               }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name, 
typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
        case gvk.PeerAuthentication:
                origSpec := 
orig.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
                modSpec := 
mod.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
@@ -184,9 +326,13 @@ func patch(c kube.Client, orig config.Config, origMeta 
metav1.ObjectMeta, mod co
                if err != nil {
                        return nil, err
                }
-               return 
c.Dubbo().SecurityV1().PeerAuthentications(orig.Namespace).
-                       Patch(context.TODO(), orig.Name, typ, patchBytes, 
metav1.PatchOptions{FieldManager: "planet-discovery"})
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "security.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "peerauthentications",
+               }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name, 
typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
        case gvk.ServiceRoute:
+               // ServiceRoute uses networking.dubbo.apache.org API group, use 
Dynamic client
                origSpec := 
orig.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
                modSpec := 
mod.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
                clonedOrigSpec := protomarshal.Clone(origSpec)
@@ -203,8 +349,11 @@ func patch(c kube.Client, orig config.Config, origMeta 
metav1.ObjectMeta, mod co
                if err != nil {
                        return nil, err
                }
-               return c.Dubbo().NetworkingV1().VirtualServices(orig.Namespace).
-                       Patch(context.TODO(), orig.Name, typ, patchBytes, 
metav1.PatchOptions{FieldManager: "planet-discovery"})
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "serviceroutes",
+               }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name, 
typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
        default:
                return nil, fmt.Errorf("unsupported type: %v", 
orig.GroupVersionKind)
        }
@@ -217,11 +366,25 @@ func delete(c kube.Client, typ config.GroupVersionKind, 
name, namespace string,
        }
        switch typ {
        case gvk.SubsetRule:
-               return 
c.Dubbo().NetworkingV1().DestinationRules(namespace).Delete(context.TODO(), 
name, deleteOptions)
+               // SubsetRule uses networking.dubbo.apache.org API group, use 
Dynamic client
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "subsetrules",
+               }).Namespace(namespace).Delete(context.TODO(), name, 
deleteOptions)
        case gvk.PeerAuthentication:
-               return 
c.Dubbo().SecurityV1().PeerAuthentications(namespace).Delete(context.TODO(), 
name, deleteOptions)
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "security.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "peerauthentications",
+               }).Namespace(namespace).Delete(context.TODO(), name, 
deleteOptions)
        case gvk.ServiceRoute:
-               return 
c.Dubbo().NetworkingV1().VirtualServices(namespace).Delete(context.TODO(), 
name, deleteOptions)
+               // ServiceRoute uses networking.dubbo.apache.org API group, use 
Dynamic client
+               return c.Dynamic().Resource(schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "serviceroutes",
+               }).Namespace(namespace).Delete(context.TODO(), name, 
deleteOptions)
        default:
                return fmt.Errorf("unsupported type: %v", typ)
        }
@@ -301,7 +464,32 @@ var translationMap = map[config.GroupVersionKind]func(r 
runtime.Object) config.C
                }
        },
        gvk.SubsetRule: func(r runtime.Object) config.Config {
-               obj := r.(*apiistioioapinetworkingv1.DestinationRule)
+               var obj *apiistioioapinetworkingv1.DestinationRule
+               // Handle unstructured objects from Dynamic client
+               // First try to convert from unstructured, as Dynamic client 
returns unstructured objects
+               // Note: r may be controllers.Object which embeds 
runtime.Object, so we need to check the concrete type
+               switch v := r.(type) {
+               case *unstructured.Unstructured:
+                       obj = &apiistioioapinetworkingv1.DestinationRule{}
+                       if err := 
runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err != 
nil {
+                               panic(fmt.Sprintf("failed to convert 
unstructured to DestinationRule: %v", err))
+                       }
+               case *apiistioioapinetworkingv1.DestinationRule:
+                       // Handle typed objects from Istio client
+                       obj = v
+               default:
+                       // Fallback: try to convert any runtime.Object to 
unstructured first, then to DestinationRule
+                       uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+                       if err == nil {
+                               u := &unstructured.Unstructured{Object: uObj}
+                               obj = 
&apiistioioapinetworkingv1.DestinationRule{}
+                               if err := 
runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err != 
nil {
+                                       panic(fmt.Sprintf("failed to convert 
object %T to DestinationRule: %v", r, err))
+                               }
+                       } else {
+                               panic(fmt.Sprintf("unexpected object type for 
SubsetRule: %T, expected *unstructured.Unstructured or 
*apiistioioapinetworkingv1.DestinationRule, conversion error: %v", r, err))
+                       }
+               }
                return config.Config{
                        Meta: config.Meta{
                                GroupVersionKind:  gvk.SubsetRule,
@@ -338,7 +526,27 @@ var translationMap = map[config.GroupVersionKind]func(r 
runtime.Object) config.C
                }
        },
        gvk.PeerAuthentication: func(r runtime.Object) config.Config {
-               obj := r.(*apiistioioapisecurityv1.PeerAuthentication)
+               var obj *apiistioioapisecurityv1.PeerAuthentication
+               switch v := r.(type) {
+               case *unstructured.Unstructured:
+                       obj = &apiistioioapisecurityv1.PeerAuthentication{}
+                       if err := 
runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err != 
nil {
+                               panic(fmt.Sprintf("failed to convert 
unstructured to PeerAuthentication: %v", err))
+                       }
+               case *apiistioioapisecurityv1.PeerAuthentication:
+                       obj = v
+               default:
+                       uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+                       if err == nil {
+                               u := &unstructured.Unstructured{Object: uObj}
+                               obj = 
&apiistioioapisecurityv1.PeerAuthentication{}
+                               if err := 
runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err != 
nil {
+                                       panic(fmt.Sprintf("failed to convert 
object %T to PeerAuthentication: %v", r, err))
+                               }
+                       } else {
+                               panic(fmt.Sprintf("unexpected object type for 
PeerAuthentication: %T, conversion error: %v", r, err))
+                       }
+               }
                return config.Config{
                        Meta: config.Meta{
                                GroupVersionKind:  gvk.PeerAuthentication,
@@ -466,7 +674,32 @@ var translationMap = map[config.GroupVersionKind]func(r 
runtime.Object) config.C
                }
        },
        gvk.ServiceRoute: func(r runtime.Object) config.Config {
-               obj := r.(*apiistioioapinetworkingv1.VirtualService)
+               var obj *apiistioioapinetworkingv1.VirtualService
+               // Handle unstructured objects from Dynamic client
+               // First try to convert from unstructured, as Dynamic client 
returns unstructured objects
+               // Note: r may be controllers.Object which embeds 
runtime.Object, so we need to check the concrete type
+               switch v := r.(type) {
+               case *unstructured.Unstructured:
+                       obj = &apiistioioapinetworkingv1.VirtualService{}
+                       if err := 
runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err != 
nil {
+                               panic(fmt.Sprintf("failed to convert 
unstructured to VirtualService: %v", err))
+                       }
+               case *apiistioioapinetworkingv1.VirtualService:
+                       // Handle typed objects from Istio client
+                       obj = v
+               default:
+                       // Fallback: try to convert any runtime.Object to 
unstructured first, then to VirtualService
+                       uObj, err := 
runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+                       if err == nil {
+                               u := &unstructured.Unstructured{Object: uObj}
+                               obj = 
&apiistioioapinetworkingv1.VirtualService{}
+                               if err := 
runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err != 
nil {
+                                       panic(fmt.Sprintf("failed to convert 
object %T to VirtualService: %v", r, err))
+                               }
+                       } else {
+                               panic(fmt.Sprintf("unexpected object type for 
ServiceRoute: %T, expected *unstructured.Unstructured or 
*apiistioioapinetworkingv1.VirtualService, conversion error: %v", r, err))
+                       }
+               }
                return config.Config{
                        Meta: config.Meta{
                                GroupVersionKind:  gvk.ServiceRoute,
diff --git a/dubbod/planet/pkg/model/push_context.go 
b/dubbod/planet/pkg/model/push_context.go
index fd8f1d8c..7990cd3b 100644
--- a/dubbod/planet/pkg/model/push_context.go
+++ b/dubbod/planet/pkg/model/push_context.go
@@ -19,12 +19,14 @@ package model
 
 import (
        "cmp"
-       "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
-       networking "istio.io/api/networking/v1alpha3"
        "sort"
        "sync"
        "time"
 
+       "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+       networking "istio.io/api/networking/v1alpha3"
+
        
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/serviceregistry/provider"
        "github.com/apache/dubbo-kubernetes/pkg/cluster"
        "github.com/apache/dubbo-kubernetes/pkg/config"
@@ -139,6 +141,9 @@ type serviceRouteIndex struct {
 
        // Map of VS hostname -> referenced hostnames
        referencedDestinations map[string]sets.String
+
+       // hostToRoutes keeps the resolved VirtualServices keyed by host
+       hostToRoutes map[host.Name][]config.Config
 }
 
 type subsetRuleIndex struct {
@@ -175,6 +180,7 @@ func newServiceRouteIndex() serviceRouteIndex {
        out := serviceRouteIndex{
                delegates:              map[ConfigKey][]ConfigKey{},
                referencedDestinations: map[string]sets.String{},
+               hostToRoutes:           map[host.Name][]config.Config{},
        }
        return out
 }
@@ -505,7 +511,10 @@ func (ps *PushContext) initServiceRegistry(env 
*Environment, configsUpdate sets.
 }
 
 func (ps *PushContext) createNewContext(env *Environment) {
+       log.Infof("createNewContext: creating new PushContext (full 
initialization)")
        ps.initServiceRegistry(env, nil)
+       ps.initServiceRoutes(env)
+       ps.initSubsetRules(env)
 }
 
 func (ps *PushContext) updateContext(env *Environment, oldPushContext 
*PushContext, pushReq *PushRequest) {
@@ -518,14 +527,45 @@ func (ps *PushContext) updateContext(env *Environment, 
oldPushContext *PushConte
 
        // Check if serviceRoutes have changed base on:
        // 1. ServiceRoute updates in ConfigsUpdated
-       serviceRoutesChanged := pushReq != nil && 
(HasConfigsOfKind(pushReq.ConfigsUpdated, kind.ServiceRoute) ||
+       // 2. Full push (Full: true) - always re-initialize on full push
+       serviceRoutesChanged := pushReq != nil && (pushReq.Full || 
HasConfigsOfKind(pushReq.ConfigsUpdated, kind.ServiceRoute) ||
                len(pushReq.AddressesUpdated) > 0)
 
-       // Check if serviceRoutes have changed base on:
+       if pushReq != nil {
+               serviceRouteCount := 0
+               for cfg := range pushReq.ConfigsUpdated {
+                       if cfg.Kind == kind.ServiceRoute {
+                               serviceRouteCount++
+                       }
+               }
+               if serviceRouteCount > 0 {
+                       log.Infof("updateContext: detected %d ServiceRoute 
config changes", serviceRouteCount)
+               }
+       }
+
+       // Check if subsetRules have changed base on:
        // 1. SubsetRule updates in ConfigsUpdated
-       subsetRulesChanged := pushReq != nil && 
(HasConfigsOfKind(pushReq.ConfigsUpdated, kind.SubsetRule) ||
+       // 2. Full push (Full: true) - always re-initialize on full push
+       subsetRulesChanged := pushReq != nil && (pushReq.Full || 
HasConfigsOfKind(pushReq.ConfigsUpdated, kind.SubsetRule) ||
                len(pushReq.AddressesUpdated) > 0)
 
+       if pushReq != nil {
+               subsetRuleCount := 0
+               for cfg := range pushReq.ConfigsUpdated {
+                       if cfg.Kind == kind.SubsetRule {
+                               subsetRuleCount++
+                       }
+               }
+               if subsetRuleCount > 0 {
+                       log.Infof("updateContext: detected %d SubsetRule config 
changes", subsetRuleCount)
+               }
+               if pushReq.Full {
+                       log.Infof("updateContext: Full push requested, will 
re-initialize SubsetRule and ServiceRoute indexes")
+               }
+               log.Debugf("updateContext: subsetRulesChanged=%v, 
serviceRoutesChanged=%v, pushReq.ConfigsUpdated size=%d, Full=%v",
+                       subsetRulesChanged, serviceRoutesChanged, 
len(pushReq.ConfigsUpdated), pushReq != nil && pushReq.Full)
+       }
+
        // Also check if the actual number of services has changed
        // This handles cases where Kubernetes Services are added/removed 
without ServiceEntry updates
        if !servicesChanged && oldPushContext != nil {
@@ -551,14 +591,18 @@ func (ps *PushContext) updateContext(env *Environment, 
oldPushContext *PushConte
        }
 
        if serviceRoutesChanged {
+               log.Infof("updateContext: ServiceRoutes changed, 
re-initializing ServiceRoute index")
                ps.initServiceRoutes(env)
        } else {
+               log.Debugf("updateContext: ServiceRoutes unchanged, reusing old 
ServiceRoute index")
                ps.serviceRouteIndex = oldPushContext.serviceRouteIndex
        }
 
        if subsetRulesChanged {
+               log.Infof("updateContext: SubsetRules changed, re-initializing 
SubsetRule index")
                ps.initSubsetRules(env)
        } else {
+               log.Debugf("updateContext: SubsetRules unchanged, reusing old 
SubsetRule index")
                ps.subsetRuleIndex = oldPushContext.subsetRuleIndex
        }
 
@@ -612,15 +656,34 @@ func (ps *PushContext) GetAllServices() []*Service {
 }
 
 func (ps *PushContext) initServiceRoutes(env *Environment) {
+       log.Infof("initServiceRoutes: starting ServiceRoute initialization")
        ps.serviceRouteIndex.referencedDestinations = map[string]sets.String{}
        serviceroutes := env.List(gvk.ServiceRoute, NamespaceAll)
+       log.Infof("initServiceRoutes: found %d ServiceRoute configs", 
len(serviceroutes))
        sroutes := make([]config.Config, len(serviceroutes))
 
        for i, r := range serviceroutes {
                sroutes[i] = resolveServiceRouteShortnames(r)
+               if vs, ok := r.Spec.(*networking.VirtualService); ok {
+                       log.Infof("initServiceRoutes: ServiceRoute %s/%s with 
hosts %v and %d HTTP routes",
+                               r.Namespace, r.Name, vs.Hosts, len(vs.Http))
+               }
        }
        sroutes, ps.serviceRouteIndex.delegates = 
mergeServiceRoutesIfNeeded(sroutes, ps.exportToDefaults.serviceRoute)
 
+       hostToRoutes := make(map[host.Name][]config.Config)
+       for i := range sroutes {
+               vs := sroutes[i].Spec.(*networking.VirtualService)
+               for idx, h := range vs.Hosts {
+                       resolvedHost := string(ResolveShortnameToFQDN(h, 
sroutes[i].Meta))
+                       vs.Hosts[idx] = resolvedHost
+                       hostName := host.Name(resolvedHost)
+                       hostToRoutes[hostName] = append(hostToRoutes[hostName], 
sroutes[i])
+                       log.Debugf("initServiceRoutes: indexed ServiceRoute 
%s/%s for hostname %s", sroutes[i].Namespace, sroutes[i].Name, hostName)
+               }
+       }
+       ps.serviceRouteIndex.hostToRoutes = hostToRoutes
+       log.Infof("initServiceRoutes: indexed ServiceRoutes for %d hostnames", 
len(hostToRoutes))
 }
 
 // sortConfigBySelectorAndCreationTime sorts the list of config objects based 
on priority and creation time.
@@ -708,16 +771,42 @@ func (ps *PushContext) setSubsetRules(configs 
[]config.Config) {
        ps.subsetRuleIndex.namespaceLocal = namespaceLocalSubRules
        ps.subsetRuleIndex.exportedByNamespace = exportedDestRulesByNamespace
        ps.subsetRuleIndex.rootNamespaceLocal = rootNamespaceLocalDestRules
+
+       // Log indexing results
+       log.Infof("setSubsetRules: indexed %d namespaces with local rules", 
len(namespaceLocalSubRules))
+       for ns, rules := range namespaceLocalSubRules {
+               totalRules := 0
+               for _, ruleList := range rules.specificSubRules {
+                       totalRules += len(ruleList)
+               }
+               log.Infof("setSubsetRules: namespace %s has %d DestinationRules 
with %d specific hostnames", ns, totalRules, len(rules.specificSubRules))
+               for hostname := range rules.specificSubRules {
+                       log.Debugf("setSubsetRules: namespace %s has rules for 
hostname %s", ns, hostname)
+               }
+       }
+       log.Infof("setSubsetRules: indexed %d namespaces with exported rules", 
len(exportedDestRulesByNamespace))
+       if rootNamespaceLocalDestRules != nil {
+               totalRootRules := 0
+               for _, ruleList := range 
rootNamespaceLocalDestRules.specificSubRules {
+                       totalRootRules += len(ruleList)
+               }
+               log.Infof("setSubsetRules: root namespace has %d 
DestinationRules with %d specific hostnames", totalRootRules, 
len(rootNamespaceLocalDestRules.specificSubRules))
+       }
 }
 
 func (ps *PushContext) initSubsetRules(env *Environment) {
        configs := env.List(gvk.SubsetRule, NamespaceAll)
+       log.Infof("initSubsetRules: found %d SubsetRule configs", len(configs))
 
        // values returned from ConfigStore.List are immutable.
        // Therefore, we make a copy
        subRules := make([]config.Config, len(configs))
        for i := range subRules {
                subRules[i] = configs[i]
+               if dr, ok := configs[i].Spec.(*networking.DestinationRule); ok {
+                       log.Infof("initSubsetRules: SubsetRule %s/%s for host 
%s with %d subsets",
+                               configs[i].Namespace, configs[i].Name, dr.Host, 
len(dr.Subsets))
+               }
        }
 
        ps.setSubsetRules(subRules)
@@ -749,6 +838,103 @@ func (ps *PushContext) initServiceAccounts(env 
*Environment, services []*Service
        }
 }
 
+// ServiceRouteForHost returns the first ServiceRoute (VirtualService) that 
matches the given host.
+func (ps *PushContext) ServiceRouteForHost(hostname host.Name) 
*networking.VirtualService {
+       routes := ps.serviceRouteIndex.hostToRoutes[hostname]
+       if len(routes) == 0 {
+               log.Debugf("ServiceRouteForHost: no ServiceRoute found for 
hostname %s", hostname)
+               return nil
+       }
+       if vs, ok := routes[0].Spec.(*networking.VirtualService); ok {
+               log.Infof("ServiceRouteForHost: found ServiceRoute %s/%s for 
hostname %s with %d HTTP routes",
+                       routes[0].Namespace, routes[0].Name, hostname, 
len(vs.Http))
+               return vs
+       }
+       log.Warnf("ServiceRouteForHost: ServiceRoute %s/%s for hostname %s is 
not a VirtualService",
+               routes[0].Namespace, routes[0].Name, hostname)
+       return nil
+}
+
+// DestinationRuleForService returns the first DestinationRule (SubsetRule) 
applicable to the service hostname/namespace.
+func (ps *PushContext) DestinationRuleForService(namespace string, hostname 
host.Name) *networking.DestinationRule {
+       log.Debugf("DestinationRuleForService: looking for DestinationRule for 
%s/%s", namespace, hostname)
+
+       // Check namespace-local rules first
+       if nsRules := ps.subsetRuleIndex.namespaceLocal[namespace]; nsRules != 
nil {
+               log.Debugf("DestinationRuleForService: checking namespace-local 
rules for %s (found %d specific rules)", namespace, 
len(nsRules.specificSubRules))
+               if dr := firstDestinationRule(nsRules, hostname); dr != nil {
+                       log.Infof("DestinationRuleForService: found 
DestinationRule in namespace-local index for %s/%s with %d subsets", namespace, 
hostname, len(dr.Subsets))
+                       return dr
+               }
+       } else {
+               log.Debugf("DestinationRuleForService: no namespace-local rules 
for namespace %s", namespace)
+       }
+
+       // Check exported rules
+       log.Debugf("DestinationRuleForService: checking exported rules (found 
%d exported namespaces)", len(ps.subsetRuleIndex.exportedByNamespace))
+       for ns, exported := range ps.subsetRuleIndex.exportedByNamespace {
+               if dr := firstDestinationRule(exported, hostname); dr != nil {
+                       log.Infof("DestinationRuleForService: found 
DestinationRule in exported rules from namespace %s for %s/%s with %d subsets", 
ns, namespace, hostname, len(dr.Subsets))
+                       return dr
+               }
+       }
+
+       // Finally, check root namespace scoped rules
+       if rootRules := ps.subsetRuleIndex.rootNamespaceLocal; rootRules != nil 
{
+               log.Debugf("DestinationRuleForService: checking root namespace 
rules (found %d specific rules)", len(rootRules.specificSubRules))
+               if dr := firstDestinationRule(rootRules, hostname); dr != nil {
+                       log.Infof("DestinationRuleForService: found 
DestinationRule in root namespace for %s/%s with %d subsets", namespace, 
hostname, len(dr.Subsets))
+                       return dr
+               }
+       }
+
+       log.Warnf("DestinationRuleForService: no DestinationRule found for 
%s/%s", namespace, hostname)
+       return nil
+}
+
+// SubsetLabelsForHost returns the label selector for a subset defined in 
DestinationRule.
+func (ps *PushContext) SubsetLabelsForHost(namespace string, hostname 
host.Name, subset string) labels.Instance {
+       if subset == "" {
+               return nil
+       }
+       rule := ps.DestinationRuleForService(namespace, hostname)
+       if rule == nil {
+               return nil
+       }
+       for _, ss := range rule.Subsets {
+               if ss.Name == subset {
+                       return labels.Instance(ss.Labels)
+               }
+       }
+       return nil
+}
+
+func firstDestinationRule(csr *consolidatedSubRules, hostname host.Name) 
*networking.DestinationRule {
+       if csr == nil {
+               log.Debugf("firstDestinationRule: consolidatedSubRules is nil 
for hostname %s", hostname)
+               return nil
+       }
+       if rules := csr.specificSubRules[hostname]; len(rules) > 0 {
+               log.Debugf("firstDestinationRule: found %d rules for hostname 
%s", len(rules), hostname)
+               if dr, ok := rules[0].rule.Spec.(*networking.DestinationRule); 
ok {
+                       log.Debugf("firstDestinationRule: successfully cast to 
DestinationRule for hostname %s", hostname)
+                       return dr
+               } else {
+                       log.Warnf("firstDestinationRule: failed to cast rule to 
DestinationRule for hostname %s", hostname)
+               }
+       } else {
+               log.Debugf("firstDestinationRule: no specific rules found for 
hostname %s (available hostnames: %v)", hostname, func() []string {
+                       hosts := make([]string, 0, len(csr.specificSubRules))
+                       for h := range csr.specificSubRules {
+                               hosts = append(hosts, string(h))
+                       }
+                       return hosts
+               }())
+       }
+       // TODO: support wildcard hosts
+       return nil
+}
+
 func (ps *PushContext) DelegateServiceRoutes(vses []config.Config) 
[]ConfigHash {
        var out []ConfigHash
        for _, vs := range vses {
diff --git a/dubbod/planet/pkg/model/serviceroute.go 
b/dubbod/planet/pkg/model/serviceroute.go
index 46a2819f..e455b7e3 100644
--- a/dubbod/planet/pkg/model/serviceroute.go
+++ b/dubbod/planet/pkg/model/serviceroute.go
@@ -1,13 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package model
 
 import (
+       "strings"
+
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "google.golang.org/protobuf/proto"
        networking "istio.io/api/networking/v1alpha3"
        "k8s.io/apimachinery/pkg/types"
-       "strings"
 )
 
 func resolveServiceRouteShortnames(config config.Config) config.Config {
@@ -209,14 +228,21 @@ func mergeHTTPMatchRequests(root, delegate 
[]*networking.HTTPMatchRequest) (out
 }
 
 func mergeHTTPMatchRequest(root, delegate *networking.HTTPMatchRequest) 
*networking.HTTPMatchRequest {
-       // nolint: govet
-       out := *delegate
+       cloned := proto.Clone(delegate)
+       out, ok := cloned.(*networking.HTTPMatchRequest)
+       if !ok {
+               log.Warnf("mergeHTTPMatchRequest: failed to clone 
HTTPMatchRequest for delegate %s", delegate.GetName())
+               return nil
+       }
+       if out == nil {
+               return nil
+       }
        if out.Name == "" {
                out.Name = root.Name
        } else if root.Name != "" {
                out.Name = root.Name + "-" + out.Name
        }
-       return &out
+       return out
 }
 
 func hasConflict(root, leaf *networking.HTTPMatchRequest) bool {
diff --git a/dubbod/planet/pkg/model/subsetrule.go 
b/dubbod/planet/pkg/model/subsetrule.go
index f6dab28a..954b8cde 100644
--- a/dubbod/planet/pkg/model/subsetrule.go
+++ b/dubbod/planet/pkg/model/subsetrule.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package model
 
 import (
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go 
b/dubbod/planet/pkg/networking/grpcgen/cds.go
index 4a3676b8..3efff2bb 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
 
 import (
        "fmt"
+
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
 
@@ -113,7 +114,8 @@ func newClusterBuilder(node *model.Proxy, push 
*model.PushContext, defaultCluste
 
 func (b *clusterBuilder) build() []*cluster.Cluster {
        var defaultCluster *cluster.Cluster
-       if b.filter.Contains(b.defaultClusterName) {
+       defaultRequested := b.filter == nil || 
b.filter.Contains(b.defaultClusterName)
+       if defaultRequested {
                defaultCluster = b.edsCluster(b.defaultClusterName)
                // CRITICAL: For gRPC proxyless, we need to set CommonLbConfig 
to handle endpoint health status
                // Following Istio's implementation, we should include 
UNHEALTHY and DRAINING endpoints
@@ -136,6 +138,7 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
                                core.HealthStatus_DEGRADED,
                        },
                }
+               log.Infof("clusterBuilder.build: generated default cluster %s", 
b.defaultClusterName)
        }
 
        subsetClusters := b.applyDestinationRule(defaultCluster)
@@ -143,7 +146,10 @@ func (b *clusterBuilder) build() []*cluster.Cluster {
        if defaultCluster != nil {
                out = append(out, defaultCluster)
        }
-       return append(out, subsetClusters...)
+       result := append(out, subsetClusters...)
+       log.Infof("clusterBuilder.build: generated %d clusters total (1 default 
+ %d subsets) for %s",
+               len(result), len(subsetClusters), b.defaultClusterName)
+       return result
 }
 
 func (b *clusterBuilder) edsCluster(name string) *cluster.Cluster {
@@ -167,8 +173,71 @@ func (b *clusterBuilder) edsCluster(name string) 
*cluster.Cluster {
 
 func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) 
(subsetClusters []*cluster.Cluster) {
        if b.svc == nil || b.port == nil {
+               log.Warnf("applyDestinationRule: service or port is nil for 
%s", b.defaultClusterName)
+               return nil
+       }
+       log.Infof("applyDestinationRule: looking for DestinationRule for 
service %s/%s (hostname=%s, port=%d)",
+               b.svc.Attributes.Namespace, b.svc.Attributes.Name, b.hostname, 
b.portNum)
+       dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace, 
b.hostname)
+       if dr == nil {
+               log.Warnf("applyDestinationRule: no DestinationRule found for 
%s/%s", b.svc.Attributes.Namespace, b.hostname)
                return nil
        }
-       // TODO
-       return
+       if len(dr.Subsets) == 0 {
+               log.Warnf("applyDestinationRule: DestinationRule found for 
%s/%s but has no subsets", b.svc.Attributes.Namespace, b.hostname)
+               return nil
+       }
+
+       log.Infof("applyDestinationRule: found DestinationRule for %s/%s with 
%d subsets, defaultCluster requested=%v",
+               b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets), 
defaultCluster != nil)
+
+       var commonLbConfig *cluster.Cluster_CommonLbConfig
+       if defaultCluster != nil {
+               commonLbConfig = defaultCluster.CommonLbConfig
+       } else {
+               commonLbConfig = &cluster.Cluster_CommonLbConfig{
+                       OverrideHostStatus: &core.HealthStatusSet{
+                               Statuses: []core.HealthStatus{
+                                       core.HealthStatus_HEALTHY,
+                                       core.HealthStatus_UNHEALTHY,
+                                       core.HealthStatus_DRAINING,
+                                       core.HealthStatus_UNKNOWN,
+                                       core.HealthStatus_DEGRADED,
+                               },
+                       },
+               }
+       }
+
+       defaultClusterRequested := defaultCluster != nil
+       if b.filter != nil {
+               defaultClusterRequested = 
b.filter.Contains(b.defaultClusterName)
+       }
+
+       for _, subset := range dr.Subsets {
+               if subset == nil || subset.Name == "" {
+                       continue
+               }
+               clusterName := 
model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, b.hostname, 
b.portNum)
+
+               // CRITICAL: Always generate subset clusters if default cluster 
is requested
+               // This is essential for RDS WeightedCluster to work correctly
+               shouldGenerate := true
+               if b.filter != nil && !b.filter.Contains(clusterName) {
+                       // Subset cluster not explicitly requested, but 
generate it if default cluster was requested
+                       shouldGenerate = defaultClusterRequested
+               }
+
+               if !shouldGenerate {
+                       log.Debugf("applyDestinationRule: skipping subset 
cluster %s (not requested and default not requested)", clusterName)
+                       continue
+               }
+
+               log.Infof("applyDestinationRule: generating subset cluster %s 
for subset %s", clusterName, subset.Name)
+               subsetCluster := b.edsCluster(clusterName)
+               subsetCluster.CommonLbConfig = commonLbConfig
+               subsetClusters = append(subsetClusters, subsetCluster)
+       }
+
+       log.Infof("applyDestinationRule: generated %d subset clusters for 
%s/%s", len(subsetClusters), b.svc.Attributes.Namespace, b.hostname)
+       return subsetClusters
 }
diff --git a/dubbod/planet/pkg/networking/grpcgen/rds.go 
b/dubbod/planet/pkg/networking/grpcgen/rds.go
index 37c74b57..11dcf120 100644
--- a/dubbod/planet/pkg/networking/grpcgen/rds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/rds.go
@@ -19,13 +19,17 @@ package grpcgen
 
 import (
        "fmt"
-       "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
-       discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
        "strconv"
        "strings"
 
+       "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
+       discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
+       "github.com/apache/dubbo-kubernetes/pkg/config/host"
        route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+       "google.golang.org/protobuf/types/known/wrapperspb"
+       networking "istio.io/api/networking/v1alpha3"
 )
 
 func (g *GrpcConfigGenerator) BuildHTTPRoutes(node *model.Proxy, push 
*model.PushContext, routeNames []string) model.Resources {
@@ -77,28 +81,28 @@ func buildHTTPRoute(node *model.Proxy, push 
*model.PushContext, routeName string
                }
                domains = append(domains, "*") // Wildcard for any domain - 
LEAST SPECIFIC
 
+               outboundRoutes := []*route.Route{
+                       defaultSingleClusterRoute(routeName),
+               }
+               if vs := push.ServiceRouteForHost(host.Name(hostStr)); vs != 
nil {
+                       log.Infof("buildHTTPRoute: found ServiceRoute for host 
%s with %d HTTP routes", hostStr, len(vs.Http))
+                       if routes := buildRoutesFromServiceRoute(vs, 
host.Name(hostStr), parsedPort); len(routes) > 0 {
+                               log.Infof("buildHTTPRoute: built %d weighted 
routes from ServiceRoute for host %s", len(routes), hostStr)
+                               outboundRoutes = routes
+                       } else {
+                               log.Warnf("buildHTTPRoute: ServiceRoute found 
but no routes built for host %s", hostStr)
+                       }
+               } else {
+                       log.Debugf("buildHTTPRoute: no ServiceRoute found for 
host %s, using default route", hostStr)
+               }
+
                return &route.RouteConfiguration{
                        Name: routeName,
                        VirtualHosts: []*route.VirtualHost{
                                {
                                        Name:    fmt.Sprintf("%s|http|%d", 
hostStr, parsedPort),
                                        Domains: domains,
-                                       Routes: []*route.Route{
-                                               {
-                                                       Match: 
&route.RouteMatch{
-                                                               PathSpecifier: 
&route.RouteMatch_Prefix{
-                                                                       Prefix: 
"/",
-                                                               },
-                                                       },
-                                                       Action: 
&route.Route_Route{
-                                                               Route: 
&route.RouteAction{
-                                                                       
ClusterSpecifier: &route.RouteAction_Cluster{
-                                                                               
Cluster: routeName, // Use routeName (cluster name)
-                                                                       },
-                                                               },
-                                                       },
-                                               },
-                                       },
+                                       Routes:  outboundRoutes,
                                },
                        },
                }
@@ -126,3 +130,110 @@ func buildHTTPRoute(node *model.Proxy, push 
*model.PushContext, routeName string
                },
        }
 }
+
+func defaultSingleClusterRoute(clusterName string) *route.Route {
+       return &route.Route{
+               Match: &route.RouteMatch{
+                       PathSpecifier: &route.RouteMatch_Prefix{
+                               Prefix: "/",
+                       },
+               },
+               Action: &route.Route_Route{
+                       Route: &route.RouteAction{
+                               ClusterSpecifier: &route.RouteAction_Cluster{
+                                       Cluster: clusterName,
+                               },
+                       },
+               },
+       }
+}
+
+func buildRoutesFromServiceRoute(vs *networking.VirtualService, hostName 
host.Name, defaultPort int) []*route.Route {
+       if vs == nil || len(vs.Http) == 0 {
+               return nil
+       }
+       var routes []*route.Route
+       for _, httpRoute := range vs.Http {
+               if httpRoute == nil {
+                       continue
+               }
+               if built := buildRouteFromHTTPRoute(httpRoute, hostName, 
defaultPort); built != nil {
+                       routes = append(routes, built)
+               }
+       }
+       return routes
+}
+
+func buildRouteFromHTTPRoute(httpRoute *networking.HTTPRoute, hostName 
host.Name, defaultPort int) *route.Route {
+       if httpRoute == nil || len(httpRoute.Route) == 0 {
+               log.Warnf("buildRouteFromHTTPRoute: httpRoute is nil or has no 
routes")
+               return nil
+       }
+       log.Infof("buildRouteFromHTTPRoute: processing HTTPRoute with %d route 
destinations", len(httpRoute.Route))
+       weights := make([]*route.WeightedCluster_ClusterWeight, 0, 
len(httpRoute.Route))
+       var totalWeight uint32
+       for i, dest := range httpRoute.Route {
+               if dest == nil {
+                       log.Warnf("buildRouteFromHTTPRoute: route[%d] is nil", 
i)
+                       continue
+               }
+               destination := dest.Destination
+               if destination == nil {
+                       log.Warnf("buildRouteFromHTTPRoute: route[%d] has nil 
Destination (weight=%d), creating default destination with host=%s",
+                               i, dest.Weight, hostName)
+                       destination = &networking.Destination{
+                               Host: string(hostName),
+                       }
+               } else {
+                       log.Debugf("buildRouteFromHTTPRoute: route[%d] 
Destination: host=%s, subset=%s, port=%v, weight=%d",
+                               i, destination.Host, destination.Subset, 
destination.Port, dest.Weight)
+               }
+               targetHost := destination.Host
+               if targetHost == "" {
+                       targetHost = string(hostName)
+               }
+               targetPort := defaultPort
+               if destination.Port != nil && destination.Port.Number != 0 {
+                       targetPort = int(destination.Port.Number)
+               }
+               subsetName := destination.Subset
+               clusterName := 
model.BuildSubsetKey(model.TrafficDirectionOutbound, subsetName, 
host.Name(targetHost), targetPort)
+               weight := dest.Weight
+               if weight <= 0 {
+                       weight = 1
+               }
+               totalWeight += uint32(weight)
+               log.Infof("buildRouteFromHTTPRoute: route[%d] -> cluster=%s, 
subset=%s, weight=%d, host=%s, port=%d",
+                       i, clusterName, subsetName, weight, targetHost, 
targetPort)
+               weights = append(weights, &route.WeightedCluster_ClusterWeight{
+                       Name:   clusterName,
+                       Weight: wrapperspb.UInt32(uint32(weight)),
+               })
+       }
+       if len(weights) == 0 {
+               log.Warnf("buildRouteFromHTTPRoute: no valid weights generated")
+               return nil
+       }
+       weightedClusters := &route.WeightedCluster{
+               Clusters: weights,
+       }
+       if totalWeight > 0 {
+               weightedClusters.TotalWeight = wrapperspb.UInt32(totalWeight)
+       }
+       log.Infof("buildRouteFromHTTPRoute: built WeightedCluster with %d 
clusters, totalWeight=%d", len(weights), totalWeight)
+
+       return &route.Route{
+               Match: &route.RouteMatch{
+                       PathSpecifier: &route.RouteMatch_Prefix{
+                               Prefix: "/",
+                       },
+               },
+               Action: &route.Route_Route{
+                       Route: &route.RouteAction{
+                               ClusterSpecifier: 
&route.RouteAction_WeightedClusters{
+                                       WeightedClusters: weightedClusters,
+                               },
+                       },
+               },
+       }
+}
diff --git a/dubbod/planet/pkg/xds/ads.go b/dubbod/planet/pkg/xds/ads.go
index 3ea03210..8a598f72 100644
--- a/dubbod/planet/pkg/xds/ads.go
+++ b/dubbod/planet/pkg/xds/ads.go
@@ -19,12 +19,13 @@ package xds
 
 import (
        "fmt"
-       "github.com/apache/dubbo-kubernetes/pkg/maps"
        "strconv"
        "strings"
        "sync/atomic"
        "time"
 
+       "github.com/apache/dubbo-kubernetes/pkg/maps"
+
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
@@ -332,15 +333,12 @@ func (s *DiscoveryServer) processRequest(req 
*discovery.DiscoveryRequest, con *C
                resourceNamesStr = " [wildcard]"
        }
 
+       // Always log at INFO so手工调用 grpcurl 也能看到完整请求轨迹
        if shouldRespond {
-               // Log NEW requests at INFO level - these are triggered by 
grpcurl requests
-               // This makes it easy to see when a grpcurl request triggers 
xDS configuration
                log.Infof("%s: REQ %s resources:%d nonce:%s%s (will respond)", 
stype,
                        con.ID(), len(req.ResourceNames), req.ResponseNonce, 
resourceNamesStr)
        } else {
-               // Log ACK/ignored requests at DEBUG level to reduce noise
-               // These are normal XDS protocol ACKs, not new requests from 
grpcurl
-               log.Debugf("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)", 
stype,
+               log.Infof("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)", 
stype,
                        con.ID(), len(req.ResourceNames), req.ResponseNonce, 
resourceNamesStr)
        }
 
diff --git a/dubbod/planet/pkg/xds/delta.go b/dubbod/planet/pkg/xds/delta.go
index c75a7b2f..18d0260f 100644
--- a/dubbod/planet/pkg/xds/delta.go
+++ b/dubbod/planet/pkg/xds/delta.go
@@ -19,10 +19,11 @@ package xds
 
 import (
        "errors"
-       dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
        "strings"
        "time"
 
+       dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
+
        dubbogrpc "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/grpc"
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
        v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
@@ -67,12 +68,16 @@ func (s *DiscoveryServer) StreamDeltas(stream 
DeltaDiscoveryStream) error {
                return status.Errorf(codes.ResourceExhausted, "request rate 
limit exceeded: %v", err)
        }
 
-       // TODO authenticate
+       ids, err := s.authenticate(ctx)
+       if err != nil {
+               return status.Error(codes.Unauthenticated, err.Error())
+       }
 
        s.globalPushContext().InitContext(s.Env, nil, nil)
        con := newDeltaConnection(peerAddr, stream)
+       con.s = s
 
-       go s.receiveDelta(con, nil)
+       go s.receiveDelta(con, ids)
 
        <-con.InitializedCh()
 
@@ -154,6 +159,18 @@ func (s *DiscoveryServer) receiveDelta(con *Connection, 
identities []string) {
                        deltaLog.Infof("new delta connection for node:%s", 
con.ID())
                }
 
+               subscribeStr := " [wildcard]"
+               if len(req.ResourceNamesSubscribe) > 0 {
+                       subscribeStr = " [" + 
strings.Join(req.ResourceNamesSubscribe, ", ") + "]"
+               }
+               unsubscribeStr := ""
+               if len(req.ResourceNamesUnsubscribe) > 0 {
+                       unsubscribeStr = " unsubscribe:[" + 
strings.Join(req.ResourceNamesUnsubscribe, ", ") + "]"
+               }
+               deltaLog.Infof("%s: RAW DELTA REQ %s sub:%d%s nonce:%s%s",
+                       v3.GetShortType(req.TypeUrl), con.ID(), 
len(req.ResourceNamesSubscribe), subscribeStr,
+                       req.ResponseNonce, unsubscribeStr)
+
                select {
                case con.deltaReqChan <- req:
                case <-con.deltaStream.Context().Done():
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go 
b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index 92218554..e18b9190 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -273,14 +273,18 @@ func (b *EndpointBuilder) servicePort(port int) 
*model.Port {
 }
 
 func (b *EndpointBuilder) matchesSubset(epLabels labels.Instance) bool {
-       // TODO: implement subset matching logic based on SubsetRule
-       // For now, return true if no subset is specified or if subset is empty
        if b.subsetName == "" {
                return true
        }
-       // Simplified subset matching - in real implementation, this should 
match
-       // against SubsetRule subset labels
-       return true
+       if b.service == nil || b.push == nil {
+               return true
+       }
+       selector := b.push.SubsetLabelsForHost(b.service.Attributes.Namespace, 
b.hostname, b.subsetName)
+       if len(selector) == 0 {
+               // No subset labels defined, treat as match-all
+               return true
+       }
+       return selector.SubsetOf(epLabels)
 }
 
 func (b *EndpointBuilder) buildLbEndpoint(ep *model.DubboEndpoint) 
*endpoint.LbEndpoint {
diff --git a/dubbod/planet/pkg/xds/xdsgen.go b/dubbod/planet/pkg/xds/xdsgen.go
index 318ba13a..54e7f1aa 100644
--- a/dubbod/planet/pkg/xds/xdsgen.go
+++ b/dubbod/planet/pkg/xds/xdsgen.go
@@ -580,6 +580,26 @@ func (s *DiscoveryServer) pushDeltaXds(con *Connection, w 
*model.WatchedResource
                resp.RemovedResources = sets.SortedList(removed)
        }
        var newResourceNames sets.String
+       if shouldSetWatchedResources(w) {
+               if usedDelta {
+                       if w.ResourceNames != nil {
+                               newResourceNames = w.ResourceNames.Copy()
+                       } else {
+                               newResourceNames = sets.New[string]()
+                       }
+                       for _, removed := range resp.RemovedResources {
+                               newResourceNames.Delete(removed)
+                       }
+                       for _, r := range res {
+                               newResourceNames.Insert(r.Name)
+                       }
+               } else {
+                       newResourceNames = resourceNamesSet(res)
+               }
+       }
+       if neverRemoveDelta(w.TypeUrl) {
+               resp.RemovedResources = nil
+       }
        if len(resp.RemovedResources) > 0 {
                deltaLog.Infof("%v REMOVE for node:%s %v", 
v3.GetShortType(w.TypeUrl), con.ID(), resp.RemovedResources)
        }
@@ -632,6 +652,36 @@ func (s *DiscoveryServer) findGenerator(typeURL string, 
con *Connection) model.X
        return g
 }
 
+func resourceNamesSet(res model.Resources) sets.String {
+       names := sets.New[string]()
+       for _, r := range res {
+               if r != nil {
+                       names.Insert(r.Name)
+               }
+       }
+       return names
+}
+
+func shouldSetWatchedResources(w *model.WatchedResource) bool {
+       if w == nil {
+               return false
+       }
+       if requiresResourceNamesModification(w.TypeUrl) {
+               return false
+       }
+       return xds.IsWildcardTypeURL(w.TypeUrl)
+}
+
+func requiresResourceNamesModification(typeURL string) bool {
+       return typeURL == v3.AddressType
+}
+
+func neverRemoveDelta(typeURL string) bool {
+       // Align with Envoy bug https://github.com/envoyproxy/envoy/issues/32823
+       // Skip removals for ExtensionConfiguration to avoid flapping.
+       return typeURL == v3.ExtensionConfigurationType
+}
+
 // extractRouteNamesFromLDS extracts route names referenced in LDS listener 
resources
 // For outbound listeners with ApiListener, the route name is the 
RouteConfigName from Rds config
 // Route name format is the same as cluster name: "outbound|port||hostname"
diff --git a/manifests/charts/base/files/crd-all.yaml 
b/manifests/charts/base/files/crd-all.yaml
index 6e55e0cc..9f82a97e 100644
--- a/manifests/charts/base/files/crd-all.yaml
+++ b/manifests/charts/base/files/crd-all.yaml
@@ -197,7 +197,7 @@ spec:
                           redirect or forward (default) traffic.
                         items:
                           properties:
-                            subset:
+                            destination:
                               description: Subset uniquely identifies the 
instances
                                 of a service to which the request/connection 
should
                                 be forwarded to.
@@ -225,7 +225,7 @@ spec:
                               format: int32
                               type: integer
                           required:
-                            - subset
+                            - destination
                           type: object
                         type: array
                     type: object
diff --git 
a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml 
b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
index 3528c947..2efb6f52 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
@@ -7,6 +7,9 @@ rules:
   - apiGroups: [ "security.istio.io", "networking.istio.io" ]
     verbs: [ "get", "watch", "list" ]
     resources: [ "*" ]
+  - apiGroups: [ "security.dubbo.apache.org", "networking.dubbo.apache.org" ]
+    verbs: [ "get", "watch", "list" ]
+    resources: [ "*" ]
   - apiGroups: ["admissionregistration.k8s.io"]
     resources: ["mutatingwebhookconfigurations"]
     verbs: ["get", "list", "watch", "update", "patch"]
diff --git 
a/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
 
b/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
index 18c3d772..34c303bd 100644
--- 
a/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
+++ 
b/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
@@ -4,24 +4,37 @@ kind: ValidatingWebhookConfiguration
 metadata:
   name: dubbo-validator-dubbo-system
   namespace: default
+  labels:
+    app: dubbod
+    dubbo.apache.org/rev: {{ .Values.revision | default "default" }}
 webhooks:
-  - name: validation.dubbo.apache.org
+  - name: rev.validation.dubbo.apache.org
+    admissionReviewVersions: ["v1"]
     clientConfig:
       service:
         name: dubbod
         namespace: dubbo-system
         path: "/validate"
+      caBundle: ""
+    failurePolicy: Ignore
+    objectSelector:
+      matchExpressions:
+      - key: dubbo.apache.org/rev
+        operator: In
+        values:
+        - default
     rules:
-      - operations:
-          - CREATE
-          - UPDATE
-        apiGroups:
-          - security.istio.io
-          - networking.istio.io
-        apiVersions:
-          - "*"
-        resources:
-          - "*"
+    - operations:
+      - CREATE
+      - UPDATE
+      apiGroups:
+      - security.istio.io
+      - networking.istio.io
+      - security.dubbo.apache.org
+      - networking.dubbo.apache.org
+      apiVersions:
+      - "*"
+      resources:
+      - "*"
     sideEffects: None
-    admissionReviewVersions: ["v1"]
 
diff --git a/pkg/config/schema/collections/collections.agent.go 
b/pkg/config/schema/collections/collections.agent.go
index b7a6a894..60bb62de 100644
--- a/pkg/config/schema/collections/collections.agent.go
+++ b/pkg/config/schema/collections/collections.agent.go
@@ -49,7 +49,7 @@ var (
                Identifier:     "SubsetRule",
                Group:          "networking.dubbo.apache.org",
                Kind:           "SubsetRule",
-               Plural:         "destinationrules",
+               Plural:         "subsetrules",
                Version:        "v1",
                VersionAliases: []string{},
                Proto:          "istio.networking.v1alpha3.DestinationRule", 
StatusProto: "istio.meta.v1alpha1.IstioStatus",
@@ -60,9 +60,9 @@ var (
                Builtin:       false,
        }.MustBuild()
        ServiceRoute = collection.Builder{
-               Identifier:     "serviceRoute",
+               Identifier:     "ServiceRoute",
                Group:          "networking.dubbo.apache.org",
-               Kind:           "serviceRoute",
+               Kind:           "ServiceRoute",
                Plural:         "serviceroutes",
                Version:        "v1",
                VersionAliases: []string{},
diff --git a/pkg/config/schema/collections/collections.go 
b/pkg/config/schema/collections/collections.go
index a144c236..49a15f41 100644
--- a/pkg/config/schema/collections/collections.go
+++ b/pkg/config/schema/collections/collections.go
@@ -49,7 +49,7 @@ var (
                Identifier:     "SubsetRule",
                Group:          "networking.dubbo.apache.org",
                Kind:           "SubsetRule",
-               Plural:         "destinationrules",
+               Plural:         "subsetrules",
                Version:        "v1",
                VersionAliases: []string{},
                Proto:          "istio.networking.v1alpha3.DestinationRule", 
StatusProto: "istio.meta.v1alpha1.IstioStatus",
@@ -60,9 +60,9 @@ var (
                Builtin:       false,
        }.MustBuild()
        ServiceRoute = collection.Builder{
-               Identifier:     "serviceRoute",
+               Identifier:     "ServiceRoute",
                Group:          "networking.dubbo.apache.org",
-               Kind:           "serviceRoute",
+               Kind:           "ServiceRoute",
                Plural:         "serviceroutes",
                Version:        "v1",
                VersionAliases: []string{},
diff --git a/pkg/config/schema/gvk/resources.go 
b/pkg/config/schema/gvk/resources.go
index cf3828cd..a55a3f4e 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -40,7 +40,7 @@ var (
        MeshConfig                     = config.GroupVersionKind{Group: "", 
Version: "v1alpha1", Kind: "MeshConfig"}
        PeerAuthentication             = config.GroupVersionKind{Group: 
"security.dubbo.apache.org", Version: "v1", Kind: "PeerAuthentication"}
        SubsetRule                     = config.GroupVersionKind{Group: 
"networking.dubbo.apache.org", Version: "v1", Kind: "SubsetRule"}
-       ServiceRoute                   = config.GroupVersionKind{Group: 
"networking.dubbo.apache.org", Version: "v1", Kind: "serviceRoute"}
+       ServiceRoute                   = config.GroupVersionKind{Group: 
"networking.dubbo.apache.org", Version: "v1", Kind: "ServiceRoute"}
        EndpointSlice                  = config.GroupVersionKind{Group: 
"discovery.k8s.io", Version: "v1", Kind: "EndpointSlice"}
        Endpoints                      = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "Endpoints"}
 )
diff --git a/pkg/config/schema/kind/resources.go 
b/pkg/config/schema/kind/resources.go
index 2fdccc3e..602ca515 100644
--- a/pkg/config/schema/kind/resources.go
+++ b/pkg/config/schema/kind/resources.go
@@ -68,7 +68,7 @@ func (k Kind) String() string {
        case PeerAuthentication:
                return "PeerAuthentication"
        case ServiceRoute:
-               return "serviceRoute"
+               return "ServiceRoute"
        case SubsetRule:
                return "SubsetRule"
        default:
diff --git a/pkg/config/schema/kubeclient/resources.go 
b/pkg/config/schema/kubeclient/resources.go
index 6cf97081..8e0ca75d 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -20,9 +20,11 @@ package kubeclient
 import (
        "context"
        "fmt"
+
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
        "github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
        ktypes "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+       "github.com/apache/dubbo-kubernetes/pkg/log"
        "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
        apiistioioapinetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
        apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
@@ -216,25 +218,78 @@ func getInformerFiltered(c ClientGetter, opts 
ktypes.InformerOptions, g schema.G
                        return 
c.Kube().AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.Background(),
 options)
                }
        case gvr.ServiceRoute:
+               // ServiceRoute uses networking.dubbo.apache.org API group, not 
networking.istio.io
+               // Use Dynamic client to access it
+               gvr := schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "serviceroutes",
+               }
                l = func(options metav1.ListOptions) (runtime.Object, error) {
-                       return 
c.Dubbo().NetworkingV1().VirtualServices(opts.Namespace).List(context.Background(),
 options)
+                       return 
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).List(context.Background(), 
options)
                }
                w = func(options metav1.ListOptions) (watch.Interface, error) {
-                       return 
c.Dubbo().NetworkingV1().VirtualServices(opts.Namespace).Watch(context.Background(),
 options)
+                       return 
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).Watch(context.Background(), 
options)
                }
        case gvr.SubsetRule:
+               // SubsetRule uses networking.dubbo.apache.org API group, not 
networking.istio.io
+               // Use Dynamic client to access it
+               gvr := schema.GroupVersionResource{
+                       Group:    "networking.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "subsetrules",
+               }
                l = func(options metav1.ListOptions) (runtime.Object, error) {
-                       return 
c.Dubbo().NetworkingV1().DestinationRules(opts.Namespace).List(context.Background(),
 options)
+                       // Log the namespace being watched for diagnosis
+                       if opts.Namespace == "" {
+                               log.Infof("SubsetRule informer: List called for 
all namespaces")
+                       } else {
+                               log.Infof("SubsetRule informer: List called for 
namespace %s", opts.Namespace)
+                       }
+                       return 
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).List(context.Background(), 
options)
                }
                w = func(options metav1.ListOptions) (watch.Interface, error) {
-                       return 
c.Dubbo().NetworkingV1().DestinationRules(opts.Namespace).Watch(context.Background(),
 options)
+                       // Log the namespace being watched for diagnosis
+                       if opts.Namespace == "" {
+                               log.Infof("SubsetRule informer: Watch called 
for all namespaces")
+                       } else {
+                               log.Infof("SubsetRule informer: Watch called 
for namespace %s", opts.Namespace)
+                       }
+                       watchInterface, err := 
c.Dynamic().Resource(gvr).Namespace(opts.Namespace).Watch(context.Background(), 
options)
+                       if err != nil {
+                               log.Errorf("SubsetRule informer: Watch failed: 
%v", err)
+                       } else {
+                               log.Infof("SubsetRule informer: Watch 
connection established successfully")
+                       }
+                       return watchInterface, err
                }
        case gvr.PeerAuthentication:
+               peerAuthGVR := schema.GroupVersionResource{
+                       Group:    "security.dubbo.apache.org",
+                       Version:  "v1",
+                       Resource: "peerauthentications",
+               }
                l = func(options metav1.ListOptions) (runtime.Object, error) {
-                       return 
c.Dubbo().SecurityV1().PeerAuthentications(opts.Namespace).List(context.Background(),
 options)
+                       if opts.Namespace == "" {
+                               log.Infof("PeerAuthentication informer: List 
called for all namespaces")
+                       } else {
+                               log.Infof("PeerAuthentication informer: List 
called for namespace %s", opts.Namespace)
+                       }
+                       return 
c.Dynamic().Resource(peerAuthGVR).Namespace(opts.Namespace).List(context.Background(),
 options)
                }
                w = func(options metav1.ListOptions) (watch.Interface, error) {
-                       return 
c.Dubbo().SecurityV1().PeerAuthentications(opts.Namespace).Watch(context.Background(),
 options)
+                       if opts.Namespace == "" {
+                               log.Infof("PeerAuthentication informer: Watch 
called for all namespaces")
+                       } else {
+                               log.Infof("PeerAuthentication informer: Watch 
called for namespace %s", opts.Namespace)
+                       }
+                       watchInterface, err := 
c.Dynamic().Resource(peerAuthGVR).Namespace(opts.Namespace).Watch(context.Background(),
 options)
+                       if err != nil {
+                               log.Errorf("PeerAuthentication informer: Watch 
failed: %v", err)
+                       } else {
+                               log.Infof("PeerAuthentication informer: Watch 
connection established successfully")
+                       }
+                       return watchInterface, err
                }
        case gvr.Pod:
                l = func(options metav1.ListOptions) (runtime.Object, error) {
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index eb65ff28..50363027 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -20,6 +20,7 @@ package kclient
 import (
        "context"
        "fmt"
+
        "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
 
        "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
@@ -39,7 +40,6 @@ import (
        "k8s.io/client-go/tools/cache"
 
        "sync"
-       "sync/atomic"
 
        dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
 )
@@ -123,36 +123,6 @@ func NewDelayedInformer[T controllers.ComparableObject](c 
kube.Client, gvr schem
        return newDelayedInformer[T](gvr, inf, delay, filter)
 }
 
-func newDelayedInformer[T controllers.ComparableObject](gvr 
schema.GroupVersionResource, getInf func() informerfactory.StartableInformer, 
delay kubetypes.DelayedFilter, filter Filter) Informer[T] {
-       delayedClient := &delayedClient[T]{
-               inf:     new(atomic.Pointer[Informer[T]]),
-               delayed: delay,
-       }
-
-       // If resource is not yet known, we will use the delayedClient.
-       // When the resource is later loaded, the callback will trigger and 
swap our dummy delayedClient
-       // with a full client
-       readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
-               // The inf() call is responsible for starting the informer
-               inf := getInf()
-               fc := &informerClient[T]{
-                       informer:      inf.Informer,
-                       startInformer: inf.Start,
-               }
-               applyDynamicFilter(filter, gvr, fc)
-               inf.Start(stop)
-               log.Infof("%v is now ready, building client", 
gvr.GroupResource())
-               // Swap out the dummy client with the full one
-               delayedClient.set(fc)
-       })
-       if !readyNow {
-               log.Debugf("%v is not ready now, building delayed client", 
gvr.GroupResource())
-               return delayedClient
-       }
-       log.Debugf("%v ready now, building client", gvr.GroupResource())
-       return newInformerClient[T](gvr, getInf(), filter)
-}
-
 func NewFiltered[T controllers.ComparableObject](c kube.Client, filter Filter) 
Client[T] {
        gvr := types.MustToGVR[T](types.MustGVKFromType[T]())
        inf := kubeclient.GetInformerFiltered[T](c, ToOpts(c, gvr, filter), gvr)
@@ -179,25 +149,35 @@ func applyDynamicFilter[T 
controllers.ComparableObject](filter Filter, gvr schem
                filter.ObjectFilter.AddHandler(func(added, removed sets.String) 
{
                        ic.handlerMu.RLock()
                        defer ic.handlerMu.RUnlock()
+                       log.Infof("applyDynamicFilter: namespace filter handler 
triggered for %v: added=%v, removed=%v", gvr, added, removed)
                        if gvr == dubbogvr.Namespace {
                                for _, item := range 
ic.ListUnfiltered(metav1.NamespaceAll, klabels.Everything()) {
                                        if !added.Contains(item.GetName()) {
                                                continue
                                        }
+                                       log.Infof("applyDynamicFilter: 
triggering OnAdd for namespace %s", item.GetName())
                                        for _, c := range ic.registeredHandlers 
{
                                                c.handler.OnAdd(item, false)
                                        }
                                }
                        } else {
                                for ns := range added {
-                                       for _, item := range 
ic.ListUnfiltered(ns, klabels.Everything()) {
+                                       log.Infof("applyDynamicFilter: 
namespace %s added, listing unfiltered objects for %v", ns, gvr)
+                                       items := ic.ListUnfiltered(ns, 
klabels.Everything())
+                                       log.Infof("applyDynamicFilter: found %d 
unfiltered objects in namespace %s for %v", len(items), ns, gvr)
+                                       for _, item := range items {
+                                               log.Infof("applyDynamicFilter: 
triggering OnAdd for %s/%s in namespace %s", item.GetNamespace(), 
item.GetName(), ns)
                                                for _, c := range 
ic.registeredHandlers {
                                                        c.handler.OnAdd(item, 
false)
                                                }
                                        }
                                }
                                for ns := range removed {
-                                       for _, item := range 
ic.ListUnfiltered(ns, klabels.Everything()) {
+                                       log.Infof("applyDynamicFilter: 
namespace %s removed, listing unfiltered objects for %v", ns, gvr)
+                                       items := ic.ListUnfiltered(ns, 
klabels.Everything())
+                                       log.Infof("applyDynamicFilter: found %d 
unfiltered objects in namespace %s for %v", len(items), ns, gvr)
+                                       for _, item := range items {
+                                               log.Infof("applyDynamicFilter: 
triggering OnDelete for %s/%s in namespace %s", item.GetNamespace(), 
item.GetName(), ns)
                                                for _, c := range 
ic.registeredHandlers {
                                                        c.handler.OnDelete(item)
                                                }
@@ -210,15 +190,32 @@ func applyDynamicFilter[T 
controllers.ComparableObject](filter Filter, gvr schem
 
 func (n *informerClient[T]) List(namespace string, selector klabels.Selector) 
[]T {
        var res []T
+       var filteredCount int
+       var totalCount int
        err := cache.ListAllByNamespace(n.informer.GetIndexer(), namespace, 
selector, func(i any) {
+               totalCount++
                cast := i.(T)
                if n.applyFilter(cast) {
                        res = append(res, cast)
+               } else {
+                       filteredCount++
+                       // Log filtered objects to help diagnose
+                       if objWithNs, ok := any(cast).(interface {
+                               GetNamespace() string
+                               GetName() string
+                       }); ok {
+                               log.Debugf("informerClient.List: filtered out 
object %s/%s for namespace=%s", objWithNs.GetNamespace(), objWithNs.GetName(), 
namespace)
+                       }
                }
        })
 
        if err != nil {
-               fmt.Printf("lister returned err for %v: %v", namespace, err)
+               log.Warnf("informerClient.List: lister returned err for 
namespace=%s: %v", namespace, err)
+       }
+       if namespace == metav1.NamespaceAll {
+               log.Infof("informerClient.List: namespace=%s, total=%d, 
filtered=%d, result=%d", namespace, totalCount, filteredCount, len(res))
+       } else if filteredCount > 0 {
+               log.Debugf("informerClient.List: filtered out %d items for 
namespace=%s (total=%d, result=%d)", filteredCount, namespace, totalCount, 
len(res))
        }
        return res
 }
@@ -231,7 +228,20 @@ func (n *informerClient[T]) ListUnfiltered(namespace 
string, selector klabels.Se
        })
 
        if err != nil {
-               fmt.Printf("lister returned err for %v: %v", namespace, err)
+               log.Warnf("informerClient.ListUnfiltered: lister returned err 
for namespace=%s: %v", namespace, err)
+       }
+       if namespace == metav1.NamespaceAll {
+               log.Infof("informerClient.ListUnfiltered: found %d unfiltered 
objects for namespace=%s (synced=%v)", len(res), namespace, 
n.informer.HasSynced())
+               if len(res) > 0 {
+                       for i, obj := range res {
+                               if objWithNs, ok := any(obj).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       
log.Infof("informerClient.ListUnfiltered: object[%d] %s/%s", i, 
objWithNs.GetNamespace(), objWithNs.GetName())
+                               }
+                       }
+               }
        }
        return res
 }
@@ -285,12 +295,66 @@ func (n *informerClient[T]) ShutdownHandlers() {
 func (n *informerClient[T]) AddEventHandler(h cache.ResourceEventHandler) 
cache.ResourceEventHandlerRegistration {
        fh := cache.FilteringResourceEventHandler{
                FilterFunc: func(obj interface{}) bool {
+                       var nameStr, nsStr string
+                       if objWithNs, ok := any(obj).(interface {
+                               GetNamespace() string
+                               GetName() string
+                       }); ok {
+                               nsStr = objWithNs.GetNamespace()
+                               nameStr = objWithNs.GetName()
+                       }
                        if n.filter == nil {
+                               log.Debugf("informerClient.AddEventHandler: 
FilterFunc allowing object %s/%s (no filter)", nsStr, nameStr)
                                return true
                        }
-                       return n.filter(obj)
+                       cast := obj.(T)
+                       allowed := n.filter(cast)
+                       if !allowed {
+                               // Log when objects are filtered out to help 
diagnose missing events
+                               log.Infof("informerClient.AddEventHandler: 
FilterFunc filtered out object %s/%s", nsStr, nameStr)
+                       } else {
+                               log.Debugf("informerClient.AddEventHandler: 
FilterFunc allowing object %s/%s", nsStr, nameStr)
+                       }
+                       return allowed
+               },
+               Handler: cache.ResourceEventHandlerFuncs{
+                       AddFunc: func(obj interface{}) {
+                               var nameStr, nsStr string
+                               if objWithNs, ok := any(obj).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       nsStr = objWithNs.GetNamespace()
+                                       nameStr = objWithNs.GetName()
+                               }
+                               log.Infof("informerClient.AddEventHandler: 
OnAdd called for %s/%s", nsStr, nameStr)
+                               h.OnAdd(obj, false)
+                       },
+                       UpdateFunc: func(oldObj, newObj interface{}) {
+                               var nameStr, nsStr string
+                               if objWithNs, ok := any(newObj).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       nsStr = objWithNs.GetNamespace()
+                                       nameStr = objWithNs.GetName()
+                               }
+                               log.Infof("informerClient.AddEventHandler: 
OnUpdate called for %s/%s", nsStr, nameStr)
+                               h.OnUpdate(oldObj, newObj)
+                       },
+                       DeleteFunc: func(obj interface{}) {
+                               var nameStr, nsStr string
+                               if objWithNs, ok := any(obj).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       nsStr = objWithNs.GetNamespace()
+                                       nameStr = objWithNs.GetName()
+                               }
+                               log.Infof("informerClient.AddEventHandler: 
OnDelete called for %s/%s", nsStr, nameStr)
+                               h.OnDelete(obj)
+                       },
                },
-               Handler: h,
        }
        n.handlerMu.Lock()
        defer n.handlerMu.Unlock()
diff --git a/pkg/kube/kclient/delayed.go b/pkg/kube/kclient/delayed.go
index 4c7b35ef..5278783a 100644
--- a/pkg/kube/kclient/delayed.go
+++ b/pkg/kube/kclient/delayed.go
@@ -18,11 +18,13 @@
 package kclient
 
 import (
-       "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
        "sync"
        "sync/atomic"
 
+       "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
+
        "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
        "github.com/apache/dubbo-kubernetes/pkg/slices"
        klabels "k8s.io/apimachinery/pkg/labels"
@@ -89,6 +91,8 @@ func (s *delayedClient[T]) set(inf Informer[T]) {
                s.hm.Lock()
                defer s.hm.Unlock()
                for _, h := range s.handlers {
+                       // h is a delayedHandler which embeds 
ResourceEventHandler, so we can pass it directly
+                       // This matches Istio's implementation
                        reg := inf.AddEventHandler(h)
                        h.hasSynced.hasSynced.Store(ptr.Of(reg.HasSynced))
                }
@@ -210,3 +214,43 @@ func (d delayedIndex[T]) Lookup(key string) []interface{} {
        // Not ready yet, return nil
        return nil
 }
+
+func newDelayedInformer[T controllers.ComparableObject](
+       gvr schema.GroupVersionResource,
+       getInf func() informerfactory.StartableInformer,
+       delay kubetypes.DelayedFilter,
+       filter Filter,
+) Informer[T] {
+       delayedClient := &delayedClient[T]{
+               inf:     new(atomic.Pointer[Informer[T]]),
+               delayed: delay,
+       }
+
+       // If resource is not yet known, we will use the delayedClient.
+       // When the resource is later loaded, the callback will trigger and 
swap our dummy delayedClient
+       // with a full client
+       readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
+               // The inf() call is responsible for starting the informer
+               inf := getInf()
+               fc := &informerClient[T]{
+                       informer:      inf.Informer,
+                       startInformer: inf.Start,
+               }
+               applyDynamicFilter(filter, gvr, fc)
+               // Swap out the dummy client with the full one BEFORE starting 
the informer
+               // This ensures handlers are registered before the informer 
starts syncing
+               delayedClient.set(fc)
+               inf.Start(stop)
+               log.Infof("%v is now ready, building client", 
gvr.GroupResource())
+       })
+       if !readyNow {
+               log.Debugf("%v is not ready now, building delayed client", 
gvr.GroupResource())
+               return delayedClient
+       }
+       log.Debugf("%v ready now, building client", gvr.GroupResource())
+       // When readyNow is true, we need to ensure the informer is registered 
with the factory
+       // so InformerFactory.Start() can pick it up.
+       inf := getInf()
+       fc := newInformerClient[T](gvr, inf, filter)
+       return fc
+}
diff --git a/pkg/kube/krt/informer.go b/pkg/kube/krt/informer.go
index 0ddefe30..120ebaef 100644
--- a/pkg/kube/krt/informer.go
+++ b/pkg/kube/krt/informer.go
@@ -19,6 +19,7 @@ package krt
 
 import (
        "fmt"
+
        "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
 
        "github.com/apache/dubbo-kubernetes/pkg/kube"
@@ -155,7 +156,44 @@ func (i *informer[I]) Register(f func(o Event[I])) 
HandlerRegistration {
 
 func (i *informer[I]) RegisterBatch(f func(o []Event[I]), runExistingState 
bool) HandlerRegistration {
        synced := i.inf.AddEventHandler(informerEventHandler[I](func(o 
Event[I], initialSync bool) {
-               f([]Event[I]{o})
+               // Only process events if runExistingState is true OR this is 
not an initial sync event
+               // This matches Istio's behavior: runExistingState=false means 
skip initial sync events
+               if runExistingState || !initialSync {
+                       // Log all events to help diagnose missing events
+                       var nameStr, nsStr string
+                       if o.New != nil {
+                               if objWithNs, ok := any(*o.New).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       nsStr = objWithNs.GetNamespace()
+                                       nameStr = objWithNs.GetName()
+                               }
+                       } else if o.Old != nil {
+                               if objWithNs, ok := any(*o.Old).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       nsStr = objWithNs.GetNamespace()
+                                       nameStr = objWithNs.GetName()
+                               }
+                       }
+                       log.Debugf("informer.RegisterBatch: processing event %s 
for %s/%s (initialSync=%v, runExistingState=%v)", o.Event, nsStr, nameStr, 
initialSync, runExistingState)
+                       f([]Event[I]{o})
+               } else {
+                       // Log skipped events to help diagnose
+                       var nameStr, nsStr string
+                       if o.New != nil {
+                               if objWithNs, ok := any(*o.New).(interface {
+                                       GetNamespace() string
+                                       GetName() string
+                               }); ok {
+                                       nsStr = objWithNs.GetNamespace()
+                                       nameStr = objWithNs.GetName()
+                               }
+                       }
+                       log.Debugf("informer.RegisterBatch: skipping initial 
sync event for %s/%s (initialSync=%v, runExistingState=%v)", nsStr, nameStr, 
initialSync, runExistingState)
+               }
        }))
        base := i.baseSyncer
        handler := pollSyncer{
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index b0f60c7d..0e59b309 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -107,10 +107,12 @@ func (d *discoveryNamespacesFilter) Filter(obj any) bool {
        // When an object is deleted, obj could be a DeletionFinalStateUnknown 
marker item.
        ns, ok := extractObjectNamespace(obj)
        if !ok {
+               log.Debugf("discoveryNamespacesFilter.Filter: failed to extract 
namespace from object, rejecting")
                return false
        }
        if ns == "" {
                // Cluster scoped resources. Always included
+               log.Debugf("discoveryNamespacesFilter.Filter: cluster-scoped 
resource, allowing")
                return true
        }
 
@@ -118,11 +120,18 @@ func (d *discoveryNamespacesFilter) Filter(obj any) bool {
        defer d.lock.RUnlock()
        // permit all objects if discovery selectors are not specified
        if len(d.discoverySelectors) == 0 {
+               log.Debugf("discoveryNamespacesFilter.Filter: no discovery 
selectors, allowing namespace %s", ns)
                return true
        }
 
        // permit if object resides in a namespace labeled for discovery
-       return d.discoveryNamespaces.Contains(ns)
+       allowed := d.discoveryNamespaces.Contains(ns)
+       if !allowed {
+               log.Infof("discoveryNamespacesFilter.Filter: namespace %s not 
in discoveryNamespaces (selectors=%d, namespaces=%v), rejecting", ns, 
len(d.discoverySelectors), d.discoveryNamespaces)
+       } else {
+               log.Debugf("discoveryNamespacesFilter.Filter: namespace %s in 
discoveryNamespaces, allowing", ns)
+       }
+       return allowed
 }
 
 // AddHandler registers a handler on namespace, which will be triggered when 
namespace selected or deselected.
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 5a2de13a..bf5e126b 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -192,6 +192,13 @@ func Receive(ctx ConnectionContext) {
                        // Connection logged in initConnection() after addCon() 
to ensure accurate counting
                }
 
+               resourceNamesStr := " [wildcard]"
+               if len(req.ResourceNames) > 0 {
+                       resourceNamesStr = " [" + 
strings.Join(req.ResourceNames, ", ") + "]"
+               }
+               log.Infof("%s: RAW REQ %s resources:%d nonce:%s%s",
+                       model.GetShortType(req.TypeUrl), con.conID, 
len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
+
                select {
                case con.reqChan <- req:
                case <-con.stream.Context().Done():
@@ -316,6 +323,32 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
        // A nonce becomes stale following a newer nonce being sent to Envoy.
        // previousInfo.NonceSent can be empty if we previously had 
shouldRespond=true but didn't send any resources.
        if request.ResponseNonce != previousInfo.NonceSent {
+               newResources := sets.New(request.ResourceNames...)
+               // Special-case proxyless gRPC: Envoy will send a "stale" nonce 
when it changes
+               // subscriptions (e.g., after ServiceRoute introduces subset 
clusters). Treat this
+               // as a resource change rather than an ACK so the new clusters 
get a response.
+               previousResourcesCopy := previousInfo.ResourceNames.Copy()
+               if !newResources.Equals(previousResourcesCopy) && 
len(newResources) > 0 {
+                       log.Infof("%s: REQ %s nonce mismatch (got %s, sent %s) 
but resources changed -> responding",
+                               stype, id, request.ResponseNonce, 
previousInfo.NonceSent)
+                       added := newResources.Difference(previousResourcesCopy)
+                       w.UpdateWatchedResource(request.TypeUrl, func(wr 
*WatchedResource) *WatchedResource {
+                               if wr == nil {
+                                       return nil
+                               }
+                               wr.LastError = ""
+                               wr.ResourceNames = newResources
+                               // keep previous nonce so the subsequent ACK 
can match
+                               return wr
+                       })
+                       if len(added) == 0 {
+                               // Still respond to make sure client receives 
an update even if map difference logic
+                               // thinks nothing was added (e.g., only removal 
happened).
+                               return true, ResourceDelta{Subscribed: added}
+                       }
+                       return true, ResourceDelta{Subscribed: added}
+               }
+
                // Expired/stale nonce - don't respond, just log at debug level
                if previousInfo.NonceSent == "" {
                        // We never sent a nonce, but client sent one - this is 
unusual but treat as expired
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index 506b6405..cb3bef55 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -32,13 +32,76 @@ spec:
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: consumer
+  name: consumer-v1
+  namespace: grpc-app
+spec:
+  replicas: 2
+  selector:
+    matchLabels:
+      app: consumer
+      version: v1
+  template:
+    metadata:
+      annotations:
+        proxyless.dubbo.apache.org/inject: "true"
+        inject.dubbo.apache.org/templates: grpc-agent
+        proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": 
true}'
+      labels:
+        app: consumer
+        version: v1
+    spec:
+      containers:
+        - name: app
+          image: mfordjody/grpc-consumer:dev-debug
+          imagePullPolicy: Always
+          ports:
+            - containerPort: 17070
+              protocol: TCP
+              name: grpc
+          env:
+            - name: INSTANCE_IP
+              valueFrom:
+                fieldRef:
+                  apiVersion: v1
+                  fieldPath: status.podIP
+            - name: SERVICE_VERSION
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.labels['version']
+            - name: SERVICE_NAMESPACE
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.namespace
+            - name: SERVICE_PORT
+              value: "17070"
+          readinessProbe:
+            tcpSocket:
+              port: 17070
+            initialDelaySeconds: 5
+            periodSeconds: 5
+            timeoutSeconds: 2
+            successThreshold: 1
+            failureThreshold: 3
+          livenessProbe:
+            tcpSocket:
+              port: 17070
+            initialDelaySeconds: 10
+            periodSeconds: 10
+            timeoutSeconds: 2
+            successThreshold: 1
+            failureThreshold: 3
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: consumer-v2
   namespace: grpc-app
 spec:
   replicas: 2
   selector:
     matchLabels:
       app: consumer
+      version: v2
   template:
     metadata:
       annotations:
@@ -47,6 +110,7 @@ spec:
         proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": 
true}'
       labels:
         app: consumer
+        version: v2
     spec:
       containers:
         - name: app
@@ -62,6 +126,16 @@ spec:
                 fieldRef:
                   apiVersion: v1
                   fieldPath: status.podIP
+            - name: SERVICE_VERSION
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.labels['version']
+            - name: SERVICE_NAMESPACE
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.namespace
+            - name: SERVICE_PORT
+              value: "17070"
           readinessProbe:
             tcpSocket:
               port: 17070
diff --git a/tests/grpc-app/consumer/main.go b/tests/grpc-app/consumer/main.go
index 8e850edd..89da67e9 100644
--- a/tests/grpc-app/consumer/main.go
+++ b/tests/grpc-app/consumer/main.go
@@ -26,6 +26,7 @@ import (
        "os"
        "os/signal"
        "regexp"
+       "strconv"
        "strings"
        "syscall"
        "time"
@@ -47,7 +48,12 @@ var (
 type echoServer struct {
        pb.UnimplementedEchoServiceServer
        pb.UnimplementedEchoTestServiceServer
-       hostname string
+       hostname       string
+       serviceVersion string
+       namespace      string
+       instanceIP     string
+       cluster        string
+       servicePort    int
 }
 
 func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest) 
(*pb.EchoResponse, error) {
@@ -56,8 +62,13 @@ func (s *echoServer) Echo(ctx context.Context, req 
*pb.EchoRequest) (*pb.EchoRes
        }
        log.Printf("Received: %v", req.Message)
        return &pb.EchoResponse{
-               Message:  req.Message,
-               Hostname: s.hostname,
+               Message:        req.Message,
+               Hostname:       s.hostname,
+               ServiceVersion: s.serviceVersion,
+               Namespace:      s.namespace,
+               Ip:             s.instanceIP,
+               Cluster:        s.cluster,
+               ServicePort:    int32(s.servicePort),
        }, nil
 }
 
@@ -98,7 +109,14 @@ func (s *echoServer) ForwardEcho(ctx context.Context, req 
*pb.ForwardEchoRequest
 
        output := make([]string, 0, count)
        for i := int32(0); i < count; i++ {
-               line := fmt.Sprintf("[%d body] Hostname=%s", i, s.hostname)
+               line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s 
ServicePort=%d Namespace=%s",
+                       i, s.hostname, s.serviceVersion, s.servicePort, 
s.namespace)
+               if s.instanceIP != "" {
+                       line += fmt.Sprintf(" IP=%s", s.instanceIP)
+               }
+               if s.cluster != "" {
+                       line += fmt.Sprintf(" Cluster=%s", s.cluster)
+               }
                output = append(output, line)
        }
 
@@ -274,6 +292,15 @@ func waitForBootstrapFile(bootstrapPath string, maxWait 
time.Duration) error {
        }
 }
 
+func firstNonEmpty(values ...string) string {
+       for _, v := range values {
+               if strings.TrimSpace(v) != "" {
+                       return v
+               }
+       }
+       return ""
+}
+
 func main() {
        flag.Parse()
 
@@ -288,6 +315,24 @@ func main() {
                hostname = "unknown"
        }
 
+       namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"), 
os.Getenv("POD_NAMESPACE"), "default")
+       serviceVersion := firstNonEmpty(
+               os.Getenv("SERVICE_VERSION"),
+               os.Getenv("POD_VERSION"),
+               os.Getenv("VERSION"),
+       )
+       if serviceVersion == "" {
+               serviceVersion = "unknown"
+       }
+       cluster := os.Getenv("SERVICE_CLUSTER")
+       instanceIP := os.Getenv("INSTANCE_IP")
+       servicePort := *port
+       if sp := os.Getenv("SERVICE_PORT"); sp != "" {
+               if parsed, err := strconv.Atoi(sp); err == nil {
+                       servicePort = parsed
+               }
+       }
+
        // Get bootstrap file path from environment variable or use default
        bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
        if bootstrapPath == "" {
@@ -315,7 +360,14 @@ func main() {
                log.Fatalf("Failed to create xDS gRPC server: %v", err)
        }
 
-       es := &echoServer{hostname: hostname}
+       es := &echoServer{
+               hostname:       hostname,
+               serviceVersion: serviceVersion,
+               namespace:      namespace,
+               instanceIP:     instanceIP,
+               cluster:        cluster,
+               servicePort:    servicePort,
+       }
        pb.RegisterEchoServiceServer(server, es)
        pb.RegisterEchoTestServiceServer(server, es)
        // Enable reflection API for grpcurl to discover services
diff --git a/tests/grpc-app/producer/main.go b/tests/grpc-app/producer/main.go
index 9916f44d..8bb23199 100644
--- a/tests/grpc-app/producer/main.go
+++ b/tests/grpc-app/producer/main.go
@@ -525,8 +525,29 @@ func (s *testServerImpl) ForwardEcho(ctx context.Context, 
req *pb.ForwardEchoReq
                        continue
                }
 
-               log.Printf("ForwardEcho: request %d succeeded: Hostname=%s", 
i+1, resp.Hostname)
-               output = append(output, fmt.Sprintf("[%d body] Hostname=%s", i, 
resp.Hostname))
+               log.Printf("ForwardEcho: request %d succeeded: Hostname=%s 
ServiceVersion=%s Namespace=%s IP=%s",
+                       i+1, resp.Hostname, resp.ServiceVersion, 
resp.Namespace, resp.Ip)
+
+               lineParts := []string{
+                       fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
+               }
+               if resp.ServiceVersion != "" {
+                       lineParts = append(lineParts, 
fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
+               }
+               if resp.Namespace != "" {
+                       lineParts = append(lineParts, 
fmt.Sprintf("Namespace=%s", resp.Namespace))
+               }
+               if resp.Ip != "" {
+                       lineParts = append(lineParts, fmt.Sprintf("IP=%s", 
resp.Ip))
+               }
+               if resp.Cluster != "" {
+                       lineParts = append(lineParts, fmt.Sprintf("Cluster=%s", 
resp.Cluster))
+               }
+               if resp.ServicePort > 0 {
+                       lineParts = append(lineParts, 
fmt.Sprintf("ServicePort=%d", resp.ServicePort))
+               }
+
+               output = append(output, strings.Join(lineParts, " "))
 
                // Small delay between successful requests to avoid 
overwhelming the server
                if i < count-1 {
diff --git a/tests/grpc-app/proto/echo.proto b/tests/grpc-app/proto/echo.proto
index c0bd61e5..1d7c32e6 100644
--- a/tests/grpc-app/proto/echo.proto
+++ b/tests/grpc-app/proto/echo.proto
@@ -41,6 +41,11 @@ message EchoRequest {
 message EchoResponse {
   string message = 1;
   string hostname = 2;
+  string service_version = 3;
+  string namespace = 4;
+  string ip = 5;
+  string cluster = 6;
+  int32 service_port = 7;
 }
 
 message ForwardEchoRequest {

Reply via email to