This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new 44a04886 [discovery] xds get all k8s service object (#802)
44a04886 is described below

commit 44a04886389927ece20554edc60c3856dfcfa08b
Author: Jian Zhong <[email protected]>
AuthorDate: Mon Oct 13 13:58:46 2025 +0800

    [discovery] xds get all k8s service object (#802)
---
 pkg/config/constants/constants.go                  |   2 +
 pkg/config/kube/conversion.go                      |  67 ++++++
 pkg/config/schema/collections/collections.go       |  45 ++++
 pkg/config/schema/gvk/resources.go                 |  12 +
 pkg/config/schema/gvr/resources.go                 |   6 +
 pkg/config/schema/kind/resources.go                |   3 +
 pkg/config/schema/kubeclient/resources.go          |  19 ++
 pkg/config/schema/kubetypes/resources.go           |   7 +
 pkg/kube/multicluster/cluster.go                   |  23 ++
 pkg/kube/multicluster/component.go                 |  65 +++++
 pkg/kube/multicluster/secretcontroller.go          |  95 ++++++++
 sail/pkg/bootstrap/server.go                       |  47 +++-
 sail/pkg/bootstrap/servicecontroller.go            |  11 +-
 sail/pkg/config/kube/crdclient/types.go            |  19 ++
 sail/pkg/model/addressmap.go                       |  17 ++
 sail/pkg/model/controller.go                       |  22 ++
 sail/pkg/model/endpointshards.go                   |  47 ++++
 sail/pkg/model/push_context.go                     | 261 ++++++++++++++++++++-
 sail/pkg/model/service.go                          | 178 +++++++++++++-
 .../serviceregistry/kube/controller/controller.go  | 152 ++++++++++++
 .../kube/controller/endpointslice.go               | 120 ++++++++++
 .../kube/controller/multicluster.go                |  94 ++++++++
 sail/pkg/serviceregistry/kube/controller/util.go   |  16 ++
 sail/pkg/serviceregistry/kube/conversion.go        | 152 ++++++++++++
 .../pkg/serviceregistry/serviceentry/controller.go |   4 +
 sail/pkg/xds/eds.go                                |  25 +-
 26 files changed, 1467 insertions(+), 42 deletions(-)

diff --git a/pkg/config/constants/constants.go 
b/pkg/config/constants/constants.go
index 4c779b73..4bcfa552 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -18,6 +18,8 @@
 package constants
 
 const (
+       UnspecifiedIP = "0.0.0.0"
+
        DubboSystemNamespace      = "dubbo-system"
        DefaultClusterLocalDomain = "cluster.local"
        DefaultClusterName        = "Kubernetes"
diff --git a/pkg/config/kube/conversion.go b/pkg/config/kube/conversion.go
new file mode 100644
index 00000000..78918c14
--- /dev/null
+++ b/pkg/config/kube/conversion.go
@@ -0,0 +1,67 @@
+package kube
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config/protocol"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       corev1 "k8s.io/api/core/v1"
+       "strings"
+)
+
+var (
+       grpcWeb    = string(protocol.GRPCWeb)
+       grpcWebLen = len(grpcWeb)
+)
+
+const (
+       DNS = 53
+)
+
+var wellKnownPorts = sets.New[int32](DNS)
+
+func ConvertProtocol(port int32, portName string, proto corev1.Protocol, 
appProto *string) protocol.Instance {
+       if proto == corev1.ProtocolUDP {
+               return protocol.UDP
+       }
+
+       // If application protocol is set, we will use that
+       // If not, use the port name
+       name := portName
+       if appProto != nil {
+               name = *appProto
+               // Kubernetes has a few AppProtocol specific standard names 
defined in the Service spec
+               // Handle these only for AppProtocol (name cannot have these 
values, anyways).
+               // 
https://github.com/kubernetes/kubernetes/blob/b4140391cf39ea54cd0227e294283bfb5718c22d/staging/src/k8s.io/api/core/v1/generated.proto#L1245-L1248
+               switch name {
+               // "http2 over cleartext", which is also what our HTTP2 port is
+               case "kubernetes.io/h2c":
+                       return protocol.HTTP2
+               // WebSocket over cleartext
+               case "kubernetes.io/ws":
+                       return protocol.HTTP
+               // WebSocket over TLS
+               case "kubernetes.io/wss":
+                       return protocol.HTTPS
+               }
+       }
+
+       // Check if the port name prefix is "grpc-web". Need to do this before 
the general
+       // prefix check below, since it contains a hyphen.
+       if len(name) >= grpcWebLen && strings.EqualFold(name[:grpcWebLen], 
grpcWeb) {
+               return protocol.GRPCWeb
+       }
+
+       // Parse the port name to find the prefix, if any.
+       i := strings.IndexByte(name, '-')
+       if i >= 0 {
+               name = name[:i]
+       }
+
+       p := protocol.Parse(name)
+       if p == protocol.Unsupported {
+               // Make TCP as default protocol for well know ports if protocol 
is not specified.
+               if wellKnownPorts.Contains(port) {
+                       return protocol.TCP
+               }
+       }
+       return p
+}
diff --git a/pkg/config/schema/collections/collections.go 
b/pkg/config/schema/collections/collections.go
index 457283ad..e94dc2d4 100644
--- a/pkg/config/schema/collections/collections.go
+++ b/pkg/config/schema/collections/collections.go
@@ -9,6 +9,8 @@ import (
        istioioapinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
        istioioapisecurityv1beta1 "istio.io/api/security/v1beta1"
        k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
+       k8sioapicorev1 "k8s.io/api/core/v1"
+       k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
        "reflect"
 )
 
@@ -105,12 +107,55 @@ var (
                Synthetic:     false,
                Builtin:       true,
        }.MustBuild()
+       EndpointSlice = collection.Builder{
+               Identifier:    "EndpointSlice",
+               Group:         "discovery.k8s.io",
+               Kind:          "EndpointSlice",
+               Plural:        "endpointslices",
+               Version:       "v1",
+               Proto:         "k8s.io.api.discovery.v1.EndpointSlice",
+               ReflectType:   
reflect.TypeOf(&k8sioapidiscoveryv1.EndpointSlice{}).Elem(),
+               ProtoPackage:  "k8s.io/api/discovery/v1",
+               ClusterScoped: false,
+               Synthetic:     false,
+               Builtin:       true,
+       }.MustBuild()
+
+       Endpoints = collection.Builder{
+               Identifier:    "Endpoints",
+               Group:         "",
+               Kind:          "Endpoints",
+               Plural:        "endpoints",
+               Version:       "v1",
+               Proto:         "k8s.io.api.core.v1.Endpoints",
+               ReflectType:   
reflect.TypeOf(&k8sioapicorev1.Endpoints{}).Elem(),
+               ProtoPackage:  "k8s.io/api/core/v1",
+               ClusterScoped: false,
+               Synthetic:     false,
+               Builtin:       true,
+       }.MustBuild()
+       Service = collection.Builder{
+               Identifier: "Service",
+               Group:      "",
+               Kind:       "Service",
+               Plural:     "services",
+               Version:    "v1",
+               Proto:      "k8s.io.api.core.v1.ServiceSpec", StatusProto: 
"k8s.io.api.core.v1.ServiceStatus",
+               ReflectType: 
reflect.TypeOf(&k8sioapicorev1.ServiceSpec{}).Elem(), StatusType: 
reflect.TypeOf(&k8sioapicorev1.ServiceStatus{}).Elem(),
+               ProtoPackage: "k8s.io/api/core/v1", StatusPackage: 
"k8s.io/api/core/v1",
+               ClusterScoped: false,
+               Synthetic:     false,
+               Builtin:       true,
+       }.MustBuild()
 
        All = collection.NewSchemasBuilder().
                MustAdd(PeerAuthentication).
                MustAdd(RequestAuthentication).
                MustAdd(DestinationRule).
                MustAdd(VirtualService).
+               MustAdd(EndpointSlice).
+               MustAdd(Endpoints).
+               MustAdd(Service).
                MustAdd(MutatingWebhookConfiguration).
                MustAdd(ValidatingWebhookConfiguration).
                Build()
diff --git a/pkg/config/schema/gvk/resources.go 
b/pkg/config/schema/gvk/resources.go
index 8fb566d9..36dc2a91 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -42,6 +42,8 @@ var (
        AuthorizationPolicy            = config.GroupVersionKind{Group: 
"security.dubbo.io", Version: "v1", Kind: "AuthorizationPolicy"}
        DestinationRule                = config.GroupVersionKind{Group: 
"networking.dubbo.io", Version: "v1", Kind: "DestinationRule"}
        VirtualService                 = config.GroupVersionKind{Group: 
"networking.dubbo.io", Version: "v1", Kind: "VirtualService"}
+       EndpointSlice                  = config.GroupVersionKind{Group: 
"discovery.k8s.io", Version: "v1", Kind: "EndpointSlice"}
+       Endpoints                      = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "Endpoints"}
 )
 
 func ToGVR(g config.GroupVersionKind) (schema.GroupVersionResource, bool) {
@@ -82,6 +84,10 @@ func ToGVR(g config.GroupVersionKind) 
(schema.GroupVersionResource, bool) {
                return gvr.DestinationRule, true
        case VirtualService:
                return gvr.VirtualService, true
+       case EndpointSlice:
+               return gvr.EndpointSlice, true
+       case Endpoints:
+               return gvr.Endpoints, true
        }
        return schema.GroupVersionResource{}, false
 }
@@ -121,6 +127,12 @@ func FromGVR(g schema.GroupVersionResource) 
(config.GroupVersionKind, bool) {
                return VirtualService, true
        case gvr.DestinationRule:
                return DestinationRule, true
+       case gvr.EndpointSlice:
+               return EndpointSlice, true
+       case gvr.Endpoints:
+               return Endpoints, true
+       case gvr.Service:
+               return Service, true
        }
        return config.GroupVersionKind{}, false
 }
diff --git a/pkg/config/schema/gvr/resources.go 
b/pkg/config/schema/gvr/resources.go
index 35f8c75e..86f1d054 100644
--- a/pkg/config/schema/gvr/resources.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -40,6 +40,8 @@ var (
        AuthorizationPolicy            = schema.GroupVersionResource{Group: 
"security.dubbo.io", Version: "v1", Resource: "authorizationpolicies"}
        DestinationRule                = schema.GroupVersionResource{Group: 
"networking.dubbo.io", Version: "v1", Resource: "destinationrules"}
        VirtualService                 = schema.GroupVersionResource{Group: 
"networking.dubbo.io", Version: "v1", Resource: "virtualservices"}
+       EndpointSlice                  = schema.GroupVersionResource{Group: 
"discovery.k8s.io", Version: "v1", Resource: "endpointslices"}
+       Endpoints                      = schema.GroupVersionResource{Group: "", 
Version: "v1", Resource: "endpoints"}
 )
 
 func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -74,6 +76,10 @@ func IsClusterScoped(g schema.GroupVersionResource) bool {
                return true
        case ValidatingWebhookConfiguration:
                return true
+       case EndpointSlice:
+               return false
+       case Endpoints:
+               return false
        }
        return false
 }
diff --git a/pkg/config/schema/kind/resources.go 
b/pkg/config/schema/kind/resources.go
index 80ce5182..de7dab64 100644
--- a/pkg/config/schema/kind/resources.go
+++ b/pkg/config/schema/kind/resources.go
@@ -19,6 +19,7 @@ const (
        RequestAuthentication
        VirtualService
        DestinationRule
+       DNSName
 )
 
 func (k Kind) String() string {
@@ -57,6 +58,8 @@ func (k Kind) String() string {
                return "VirtualService"
        case DestinationRule:
                return "DestinationRule"
+       case DNSName:
+               return "DNSName"
        default:
                return "Unknown"
        }
diff --git a/pkg/config/schema/kubeclient/resources.go 
b/pkg/config/schema/kubeclient/resources.go
index d06deac2..ff003d02 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -30,6 +30,7 @@ import (
        k8sioapiappsv1 "k8s.io/api/apps/v1"
        k8sioapicertificatesv1 "k8s.io/api/certificates/v1"
        k8sioapicorev1 "k8s.io/api/core/v1"
+       k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
        k8sioapipolicyv1 "k8s.io/api/policy/v1"
        k8sioapiextensionsapiserverpkgapisapiextensionsv1 
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -97,6 +98,10 @@ func gvrToObject(g schema.GroupVersionResource) 
runtime.Object {
                return &k8sioapicorev1.Namespace{}
        case gvr.Secret:
                return &k8sioapicorev1.Secret{}
+       case gvr.EndpointSlice:
+               return &k8sioapidiscoveryv1.EndpointSlice{}
+       case gvr.Endpoints:
+               return &k8sioapicorev1.Endpoints{}
        case gvr.Service:
                return &k8sioapicorev1.Service{}
        case gvr.ServiceAccount:
@@ -167,6 +172,20 @@ func getInformerFiltered(c ClientGetter, opts 
ktypes.InformerOptions, g schema.G
                w = func(options metav1.ListOptions) (watch.Interface, error) {
                        return 
c.Kube().CoreV1().Secrets(opts.Namespace).Watch(context.Background(), options)
                }
+       case gvr.EndpointSlice:
+               l = func(options metav1.ListOptions) (runtime.Object, error) {
+                       return 
c.Kube().DiscoveryV1().EndpointSlices(opts.Namespace).List(context.Background(),
 options)
+               }
+               w = func(options metav1.ListOptions) (watch.Interface, error) {
+                       return 
c.Kube().DiscoveryV1().EndpointSlices(opts.Namespace).Watch(context.Background(),
 options)
+               }
+       case gvr.Endpoints:
+               l = func(options metav1.ListOptions) (runtime.Object, error) {
+                       return 
c.Kube().CoreV1().Endpoints(opts.Namespace).List(context.Background(), options)
+               }
+               w = func(options metav1.ListOptions) (watch.Interface, error) {
+                       return 
c.Kube().CoreV1().Endpoints(opts.Namespace).Watch(context.Background(), options)
+               }
        case gvr.Service:
                l = func(options metav1.ListOptions) (runtime.Object, error) {
                        return 
c.Kube().CoreV1().Services(opts.Namespace).List(context.Background(), options)
diff --git a/pkg/config/schema/kubetypes/resources.go 
b/pkg/config/schema/kubetypes/resources.go
index fd826128..562e8a54 100644
--- a/pkg/config/schema/kubetypes/resources.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -25,6 +25,7 @@ import (
        apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
        k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
        k8sioapicorev1 "k8s.io/api/core/v1"
+       k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
 )
 
 func getGvk(obj any) (config.GroupVersionKind, bool) {
@@ -49,6 +50,12 @@ func getGvk(obj any) (config.GroupVersionKind, bool) {
                return gvk.MutatingWebhookConfiguration, true
        case *k8sioapiadmissionregistrationv1.ValidatingWebhookConfiguration:
                return gvk.ValidatingWebhookConfiguration, true
+       case *k8sioapidiscoveryv1.EndpointSlice:
+               return gvk.EndpointSlice, true
+       case *k8sioapicorev1.Endpoints:
+               return gvk.Endpoints, true
+       case *k8sioapicorev1.Service:
+               return gvk.Service, true
        default:
                return config.GroupVersionKind{}, false
        }
diff --git a/pkg/kube/multicluster/cluster.go b/pkg/kube/multicluster/cluster.go
new file mode 100644
index 00000000..932becd8
--- /dev/null
+++ b/pkg/kube/multicluster/cluster.go
@@ -0,0 +1,23 @@
+package multicluster
+
+import (
+       "crypto/sha256"
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "go.uber.org/atomic"
+)
+
+type Cluster struct {
+       // ID of the cluster.
+       ID cluster.ID
+       // Client for accessing the cluster.
+       Client kube.Client
+
+       kubeConfigSha [sha256.Size]byte
+
+       stop chan struct{}
+       // initialSync is marked when RunAndWait completes
+       initialSync *atomic.Bool
+       // initialSyncTimeout is set when RunAndWait timed out
+       initialSyncTimeout *atomic.Bool
+}
diff --git a/pkg/kube/multicluster/component.go 
b/pkg/kube/multicluster/component.go
new file mode 100644
index 00000000..a673ce1a
--- /dev/null
+++ b/pkg/kube/multicluster/component.go
@@ -0,0 +1,65 @@
+package multicluster
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/maps"
+       "sync"
+)
+
+type ComponentConstraint interface {
+       Close()
+       HasSynced() bool
+}
+
+type Component[T ComponentConstraint] struct {
+       mu          sync.RWMutex
+       constructor func(cluster *Cluster) T
+       clusters    map[cluster.ID]T
+}
+
+func (m *Component[T]) clusterAdded(cluster *Cluster) ComponentConstraint {
+       comp := m.constructor(cluster)
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.clusters[cluster.ID] = comp
+       return comp
+}
+
+func (m *Component[T]) clusterUpdated(cluster *Cluster) ComponentConstraint {
+       // Build outside of the lock, in case its slow
+       comp := m.constructor(cluster)
+       old, f := m.clusters[cluster.ID]
+       m.mu.Lock()
+       m.clusters[cluster.ID] = comp
+       m.mu.Unlock()
+       // Close outside of the lock, in case its slow
+       if f {
+               old.Close()
+       }
+       return comp
+}
+
+func (m *Component[T]) clusterDeleted(cluster cluster.ID) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       // If there is an old one, close it
+       if old, f := m.clusters[cluster]; f {
+               old.Close()
+       }
+       delete(m.clusters, cluster)
+}
+
+func (m *Component[T]) All() []T {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+       return maps.Values(m.clusters)
+}
+
+func (m *Component[T]) HasSynced() bool {
+       for _, c := range m.All() {
+               if !c.HasSynced() {
+                       return false
+               }
+       }
+       return true
+}
diff --git a/pkg/kube/multicluster/secretcontroller.go 
b/pkg/kube/multicluster/secretcontroller.go
new file mode 100644
index 00000000..28d82b85
--- /dev/null
+++ b/pkg/kube/multicluster/secretcontroller.go
@@ -0,0 +1,95 @@
+package multicluster
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+       "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/client-go/rest"
+       "sync"
+)
+
+type ClusterStore struct {
+       sync.RWMutex
+       clusters sets.String
+}
+
+type Controller struct {
+       namespace            string
+       configClusterID      cluster.ID
+       configCluster        *Cluster
+       configClusterSyncers []ComponentConstraint
+
+       queue           controllers.Queue
+       secrets         kclient.Client[*corev1.Secret]
+       configOverrides []func(*rest.Config)
+
+       cs *ClusterStore
+
+       meshWatcher mesh.Watcher
+       handlers    []handler
+}
+
+func NewController(kubeclientset kube.Client, namespace string, clusterID 
cluster.ID,
+       meshWatcher mesh.Watcher, configOverrides ...func(*rest.Config),
+) *Controller {
+       controller := &Controller{
+               namespace:       namespace,
+               configClusterID: clusterID,
+               configCluster:   &Cluster{Client: kubeclientset, ID: clusterID},
+
+               configOverrides: configOverrides,
+               meshWatcher:     meshWatcher,
+       }
+       return controller
+}
+
+func (c *Controller) Run(stopCh <-chan struct{}) error {
+       // run handlers for the config cluster; do not store this *Cluster in 
the ClusterStore or give it a SyncTimeout
+       // this is done outside the goroutine, we should block other 
Run/startFuncs until this is registered
+       c.configClusterSyncers = c.handleAdd(c.configCluster)
+
+       return nil
+}
+
+func (c *Controller) handleAdd(cluster *Cluster) []ComponentConstraint {
+       syncers := make([]ComponentConstraint, 0, len(c.handlers))
+       for _, handler := range c.handlers {
+               syncers = append(syncers, handler.clusterAdded(cluster))
+       }
+       return syncers
+}
+
+func (c *Controller) handleDelete(key cluster.ID) {
+       for _, handler := range c.handlers {
+               handler.clusterDeleted(key)
+       }
+}
+
+type handler interface {
+       clusterAdded(cluster *Cluster) ComponentConstraint
+       clusterUpdated(cluster *Cluster) ComponentConstraint
+       clusterDeleted(clusterID cluster.ID)
+       HasSynced() bool
+}
+
+type ComponentBuilder interface {
+       registerHandler(h handler)
+}
+
+func BuildMultiClusterComponent[T ComponentConstraint](c ComponentBuilder, 
constructor func(cluster *Cluster) T) *Component[T] {
+       comp := &Component[T]{
+               constructor: constructor,
+               clusters:    make(map[cluster.ID]T),
+       }
+       c.registerHandler(comp)
+       return comp
+}
+
+func (c *Controller) registerHandler(h handler) {
+       // Intentionally no lock. The controller today requires that handlers 
are registered before execution and not in parallel.
+       c.handlers = append(c.handlers, h)
+}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index d601fcd0..6207b934 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -34,6 +34,7 @@ import (
        kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
        "github.com/apache/dubbo-kubernetes/pkg/kube/inject"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
        "github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
        sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
        "github.com/apache/dubbo-kubernetes/pkg/network"
@@ -46,6 +47,7 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/server"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+       
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
        tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
        "github.com/apache/dubbo-kubernetes/sail/pkg/xds"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
@@ -94,8 +96,10 @@ type Server struct {
        httpMux     *http.ServeMux
        httpsMux    *http.ServeMux // webhooks
 
-       ConfigStores     []model.ConfigStoreController
-       configController model.ConfigStoreController
+       ConfigStores           []model.ConfigStoreController
+       configController       model.ConfigStoreController
+       multiclusterController *multicluster.Controller
+       serviceEntryController *serviceentry.Controller
 
        fileWatcher         filewatcher.FileWatcher
        internalStop        chan struct{}
@@ -385,6 +389,19 @@ func (s *Server) startCA(caOpts *caOptions) {
        })
 }
 
+func (s *Server) initMulticluster(args *SailArgs) {
+       if s.kubeClient == nil {
+               return
+       }
+       s.multiclusterController = multicluster.NewController(s.kubeClient, 
args.Namespace, s.clusterID, s.environment.Watcher, func(r *rest.Config) {
+               r.QPS = args.RegistryOptions.KubeOptions.KubernetesAPIQPS
+               r.Burst = args.RegistryOptions.KubeOptions.KubernetesAPIBurst
+       })
+       s.addStartFunc("multicluster controller", func(stop <-chan struct{}) 
error {
+               return s.multiclusterController.Run(stop)
+       })
+}
+
 func (s *Server) initKubeClient(args *SailArgs) error {
        if s.kubeClient != nil {
                // Already initialized by startup arguments
@@ -479,7 +496,8 @@ func (s *Server) initGrpcServer(options 
*dubbokeepalive.Options) {
 
 func (s *Server) initControllers(args *SailArgs) error {
        klog.Info("initializing controllers")
-       // TODO initMulticluster
+
+       s.initMulticluster(args)
 
        s.initSDSServer()
 
@@ -673,12 +691,6 @@ func (s *Server) initSDSServer() {
                klog.Warningf("skipping Kubernetes credential reader; 
SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
        } else {
                // TODO ConfigUpdated Multicluster get secret and configmap
-               s.XDSServer.ConfigUpdate(&model.PushRequest{
-                       Full:           false,
-                       ConfigsUpdated: nil,
-                       Reason:         
model.NewReasonStats(model.SecretTrigger),
-               })
-
        }
 }
 
@@ -827,6 +839,23 @@ func (s *Server) dubbodReadyHandler(w http.ResponseWriter, 
_ *http.Request) {
        w.WriteHeader(http.StatusOK)
 }
 
+func (s *Server) shouldStartNsController() bool {
+       if s.isK8SSigning() {
+               // Need to distribute the roots from MeshConfig
+               return true
+       }
+       if s.CA == nil {
+               return false
+       }
+
+       // For no CA we don't distribute it either, as there is no cert
+       if features.SailCertProvider == constants.CertProviderNone {
+               return false
+       }
+
+       return true
+}
+
 func getDNSNames(args *SailArgs, host string) []string {
        // Append custom hostname if there is any
        customHost := features.DubbodServiceCustomHost
diff --git a/sail/pkg/bootstrap/servicecontroller.go 
b/sail/pkg/bootstrap/servicecontroller.go
index 70cac1bd..cf59a83d 100644
--- a/sail/pkg/bootstrap/servicecontroller.go
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -4,6 +4,7 @@ import (
        "fmt"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+       kubecontroller 
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube/controller"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
        "k8s.io/klog/v2"
 )
@@ -55,5 +56,13 @@ func (s *Server) initKubeRegistry(args *SailArgs) (err 
error) {
        args.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
        args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace
        args.RegistryOptions.KubeOptions.MeshServiceController = 
s.ServiceController()
-       return
+       kubecontroller.NewMulticluster(args.PodName,
+               args.RegistryOptions.KubeOptions,
+               s.dubbodCertBundleWatcher,
+               args.Revision,
+               s.shouldStartNsController(),
+               s.environment.ClusterLocal(),
+               s.server,
+               s.multiclusterController)
+       return err
 }
diff --git a/sail/pkg/config/kube/crdclient/types.go 
b/sail/pkg/config/kube/crdclient/types.go
index 778b833d..10f2fcbf 100644
--- a/sail/pkg/config/kube/crdclient/types.go
+++ b/sail/pkg/config/kube/crdclient/types.go
@@ -14,6 +14,7 @@ import (
        k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
        k8sioapiappsv1 "k8s.io/api/apps/v1"
        k8sioapicorev1 "k8s.io/api/core/v1"
+       k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
        k8sioapiextensionsapiserverpkgapisapiextensionsv1 
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
@@ -466,4 +467,22 @@ var translationMap = map[config.GroupVersionKind]func(r 
runtime.Object) config.C
                        Status: &obj.Status,
                }
        },
+       gvk.EndpointSlice: func(r runtime.Object) config.Config {
+               obj := r.(*k8sioapidiscoveryv1.EndpointSlice)
+               return config.Config{
+                       Meta: config.Meta{
+                               GroupVersionKind:  gvk.EndpointSlice,
+                               Name:              obj.Name,
+                               Namespace:         obj.Namespace,
+                               Labels:            obj.Labels,
+                               Annotations:       obj.Annotations,
+                               ResourceVersion:   obj.ResourceVersion,
+                               CreationTimestamp: obj.CreationTimestamp.Time,
+                               OwnerReferences:   obj.OwnerReferences,
+                               UID:               string(obj.UID),
+                               Generation:        obj.Generation,
+                       },
+                       Spec: obj,
+               }
+       },
 }
diff --git a/sail/pkg/model/addressmap.go b/sail/pkg/model/addressmap.go
index c8b166f3..01623908 100644
--- a/sail/pkg/model/addressmap.go
+++ b/sail/pkg/model/addressmap.go
@@ -73,3 +73,20 @@ func (m *AddressMap) Len() int {
 
        return len(m.Addresses)
 }
+
+func (m *AddressMap) AddAddressesFor(c cluster.ID, addresses []string) 
*AddressMap {
+       if len(addresses) == 0 {
+               return m
+       }
+
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+
+       // Create the map if nil.
+       if m.Addresses == nil {
+               m.Addresses = make(map[cluster.ID][]string)
+       }
+
+       m.Addresses[c] = append(m.Addresses[c], addresses...)
+       return m
+}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
index 72e2dd10..775ac50d 100644
--- a/sail/pkg/model/controller.go
+++ b/sail/pkg/model/controller.go
@@ -1,10 +1,19 @@
 package model
 
+import "sync"
+
 type Controller interface {
        Run(stop <-chan struct{})
        HasSynced() bool
 }
 
+type ServiceHandler func(*Service, *Service, Event)
+
+type ControllerHandlers struct {
+       mutex           sync.RWMutex
+       serviceHandlers []ServiceHandler
+}
+
 type AggregateController interface {
        Controller
 }
@@ -29,3 +38,16 @@ func (event Event) String() string {
        }
        return out
 }
+
+func (c *ControllerHandlers) NotifyServiceHandlers(prev, curr *Service, event 
Event) {
+       for _, f := range c.GetServiceHandlers() {
+               f(prev, curr, event)
+       }
+}
+
+func (c *ControllerHandlers) GetServiceHandlers() []ServiceHandler {
+       c.mutex.RLock()
+       defer c.mutex.RUnlock()
+       // Return a shallow copy of the array
+       return c.serviceHandlers
+}
diff --git a/sail/pkg/model/endpointshards.go b/sail/pkg/model/endpointshards.go
index 43e4cbfe..dd87f6fd 100644
--- a/sail/pkg/model/endpointshards.go
+++ b/sail/pkg/model/endpointshards.go
@@ -88,6 +88,44 @@ func endpointUpdateRequiresPush(oldDubboEndpoints 
[]*DubboEndpoint, incomingEndp
        return newDubboEndpoints, needPush
 }
 
+func (e *EndpointIndex) ShardsForService(serviceName, namespace string) 
(*EndpointShards, bool) {
+       e.mu.RLock()
+       defer e.mu.RUnlock()
+       byNs, ok := e.shardsBySvc[serviceName]
+       if !ok {
+               return nil, false
+       }
+       shards, ok := byNs[namespace]
+       return shards, ok
+}
+
+func (es *EndpointShards) CopyEndpoints(portMap map[string]int, ports 
sets.Set[int]) map[int][]*DubboEndpoint {
+       es.RLock()
+       defer es.RUnlock()
+       res := map[int][]*DubboEndpoint{}
+       for _, v := range es.Shards {
+               for _, ep := range v {
+                       // use the port name as the key, unless 
LegacyClusterPortKey is set and takes precedence
+                       // In EDS we match on port *name*. But for historical 
reasons, we match on port number for CDS.
+                       var portNum int
+                       if ep.LegacyClusterPortKey != 0 {
+                               if !ports.Contains(ep.LegacyClusterPortKey) {
+                                       continue
+                               }
+                               portNum = ep.LegacyClusterPortKey
+                       } else {
+                               pn, f := portMap[ep.ServicePortName]
+                               if !f {
+                                       continue
+                               }
+                               portNum = pn
+                       }
+                       res[portNum] = append(res[portNum], ep)
+               }
+       }
+       return res
+}
+
 func (e *EndpointIndex) UpdateServiceEndpoints(
        shard ShardKey,
        hostname string,
@@ -216,3 +254,12 @@ func (e *EndpointIndex) deleteServiceInner(shard ShardKey, 
serviceName, namespac
        }
        epShards.Unlock()
 }
+
+type shardRegistry interface {
+       Cluster() cluster.ID
+       Provider() provider.ID
+}
+
+func ShardKeyFromRegistry(instance shardRegistry) ShardKey {
+       return ShardKey{Cluster: instance.Cluster(), Provider: 
instance.Provider()}
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index a6fde7c4..5d0cd60f 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -18,15 +18,20 @@
 package model
 
 import (
+       "cmp"
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+       "github.com/apache/dubbo-kubernetes/pkg/slices"
+       "github.com/apache/dubbo-kubernetes/pkg/spiffe"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/pkg/xds"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
        "go.uber.org/atomic"
        meshconfig "istio.io/api/mesh/v1alpha1"
        "k8s.io/apimachinery/pkg/types"
+       "k8s.io/klog/v2"
        "sync"
        "time"
 )
@@ -40,11 +45,10 @@ var (
 type TriggerReason string
 
 const (
-       EndpointUpdate TriggerReason = "endpoint"
-       UnknownTrigger TriggerReason = "unknown"
-       ProxyRequest   TriggerReason = "proxyrequest"
-       GlobalUpdate   TriggerReason = "global"
-       SecretTrigger  TriggerReason = "secret"
+       UnknownTrigger         TriggerReason = "unknown"
+       ProxyRequest           TriggerReason = "proxyrequest"
+       GlobalUpdate           TriggerReason = "global"
+       HeadlessEndpointUpdate TriggerReason = "headlessendpoint"
 )
 
 type ProxyPushStatus struct {
@@ -102,7 +106,20 @@ type PushRequest struct {
 type ResourceDelta = xds.ResourceDelta
 
 func NewPushContext() *PushContext {
-       return &PushContext{}
+       return &PushContext{
+               ServiceIndex:    newServiceIndex(),
+               serviceAccounts: map[serviceAccountKey][]string{},
+       }
+}
+
+func newServiceIndex() serviceIndex {
+       return serviceIndex{
+               public:               []*Service{},
+               privateByNamespace:   map[string][]*Service{},
+               exportedToNamespace:  map[string][]*Service{},
+               HostnameAndNamespace: map[host.Name]map[string]*Service{},
+               instancesByPort:      map[string]map[int][]*DubboEndpoint{},
+       }
 }
 
 type ConfigKey struct {
@@ -130,8 +147,8 @@ func (pr *PushRequest) CopyMerge(other *PushRequest) 
*PushRequest {
 }
 
 type XDSUpdater interface {
-       EDSUpdate(shard ShardKey, hostname string, namespace string, entry 
[]*DubboEndpoint)
        ConfigUpdate(req *PushRequest)
+       SvcUpdate(shard ShardKey, hostname string, namespace string, event 
Event)
 }
 
 func (ps *PushContext) InitContext(env *Environment, oldPushContext 
*PushContext, pushReq *PushRequest) {
@@ -189,9 +206,20 @@ func (ps *PushContext) initDefaultExportMaps() {
        }
 }
 
-func (ps *PushContext) createNewContext(env *Environment) {}
+func (ps *PushContext) createNewContext(env *Environment) {
+       ps.initServiceRegistry(env, nil)
+}
 
 func (ps *PushContext) updateContext(env *Environment, oldPushContext 
*PushContext, pushReq *PushRequest) {
+       var servicesChanged bool
+       if servicesChanged {
+               // Services have changed. initialize service registry
+               ps.initServiceRegistry(env, pushReq.ConfigsUpdated)
+       } else {
+               // make sure we copy over things that would be generated in 
initServiceRegistry
+               ps.ServiceIndex = oldPushContext.ServiceIndex
+               ps.serviceAccounts = oldPushContext.serviceAccounts
+       }
 }
 
 func (pr *PushRequest) Merge(other *PushRequest) *PushRequest {
@@ -287,6 +315,223 @@ func (ps *PushContext) servicesExportedToNamespace(ns 
string) []*Service {
        return out
 }
 
+func (ps *PushContext) initServiceRegistry(env *Environment, configsUpdate 
sets.Set[ConfigKey]) {
+       // Sort the services in order of creation.
+       allServices := SortServicesByCreationTime(env.Services())
+       resolveServiceAliases(allServices, configsUpdate)
+
+       for _, s := range allServices {
+               portMap := map[string]int{}
+               ports := sets.New[int]()
+               for _, port := range s.Ports {
+                       portMap[port.Name] = port.Port
+                       ports.Insert(port.Port)
+               }
+
+               svcKey := s.Key()
+               if _, ok := ps.ServiceIndex.instancesByPort[svcKey]; !ok {
+                       ps.ServiceIndex.instancesByPort[svcKey] = 
make(map[int][]*DubboEndpoint)
+               }
+               shards, ok := 
env.EndpointIndex.ShardsForService(string(s.Hostname), s.Attributes.Namespace)
+               if ok {
+                       instancesByPort := shards.CopyEndpoints(portMap, ports)
+                       // Iterate over the instances and add them to the 
service index to avoid overriding the existing port instances.
+                       for port, instances := range instancesByPort {
+                               ps.ServiceIndex.instancesByPort[svcKey][port] = 
instances
+                       }
+               }
+               if _, f := ps.ServiceIndex.HostnameAndNamespace[s.Hostname]; !f 
{
+                       ps.ServiceIndex.HostnameAndNamespace[s.Hostname] = 
map[string]*Service{}
+               }
+               // In some scenarios, there may be multiple Services defined 
for the same hostname due to ServiceEntry allowing
+               // arbitrary hostnames. In these cases, we want to pick the 
first Service, which is the oldest. This ensures
+               // newly created Services cannot take ownership unexpectedly.
+               // However, the Service is from Kubernetes it should take 
precedence over ones not. This prevents someone from
+               // "domain squatting" on the hostname before a Kubernetes 
Service is created.
+               if existing := 
ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace]; 
existing != nil &&
+                       !(existing.Attributes.ServiceRegistry != 
provider.Kubernetes && s.Attributes.ServiceRegistry == provider.Kubernetes) {
+                       klog.V(2).Infof("Service %s/%s from registry %s ignored 
by %s/%s/%s", s.Attributes.Namespace, s.Hostname, s.Attributes.ServiceRegistry,
+                               existing.Attributes.ServiceRegistry, 
existing.Attributes.Namespace, existing.Hostname)
+               } else {
+                       
ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace] = s
+               }
+
+               ns := s.Attributes.Namespace
+               if s.Attributes.ExportTo.IsEmpty() {
+                       if 
ps.exportToDefaults.service.Contains(visibility.Private) {
+                               ps.ServiceIndex.privateByNamespace[ns] = 
append(ps.ServiceIndex.privateByNamespace[ns], s)
+                       } else if 
ps.exportToDefaults.service.Contains(visibility.Public) {
+                               ps.ServiceIndex.public = 
append(ps.ServiceIndex.public, s)
+                       }
+               } else {
+                       // if service has exportTo *, make it public and ignore 
all other exportTos.
+                       // if service does not have exportTo *, but has 
exportTo ~ - i.e. not visible to anyone, ignore all exportTos.
+                       // if service has exportTo ., replace with current 
namespace.
+                       if s.Attributes.ExportTo.Contains(visibility.Public) {
+                               ps.ServiceIndex.public = 
append(ps.ServiceIndex.public, s)
+                               continue
+                       } else if 
s.Attributes.ExportTo.Contains(visibility.None) {
+                               continue
+                       }
+                       // . or other namespaces
+                       for exportTo := range s.Attributes.ExportTo {
+                               if exportTo == visibility.Private || 
string(exportTo) == ns {
+                                       // exportTo with same namespace is 
effectively private
+                                       ps.ServiceIndex.privateByNamespace[ns] 
= append(ps.ServiceIndex.privateByNamespace[ns], s)
+                               } else {
+                                       // exportTo is a specific target 
namespace
+                                       
ps.ServiceIndex.exportedToNamespace[string(exportTo)] = 
append(ps.ServiceIndex.exportedToNamespace[string(exportTo)], s)
+                               }
+                       }
+               }
+       }
+
+       ps.initServiceAccounts(env, allServices)
+}
+
+func (ps *PushContext) initServiceAccounts(env *Environment, services 
[]*Service) {
+       for _, svc := range services {
+               var accounts sets.String
+               // First get endpoint level service accounts
+               shard, f := 
env.EndpointIndex.ShardsForService(string(svc.Hostname), 
svc.Attributes.Namespace)
+               if f {
+                       shard.RLock()
+                       // copy here to reduce the lock time
+                       // endpoints could update frequently, so the longer it 
locks, the more likely it will block other threads.
+                       accounts = shard.ServiceAccounts.Copy()
+                       shard.RUnlock()
+               }
+               if len(svc.ServiceAccounts) > 0 {
+                       if accounts == nil {
+                               accounts = sets.New(svc.ServiceAccounts...)
+                       } else {
+                               accounts = 
accounts.InsertAll(svc.ServiceAccounts...)
+                       }
+               }
+               sa := sets.SortedList(spiffe.ExpandWithTrustDomains(accounts, 
ps.Mesh.TrustDomainAliases))
+               key := serviceAccountKey{
+                       hostname:  svc.Hostname,
+                       namespace: svc.Attributes.Namespace,
+               }
+               ps.serviceAccounts[key] = sa
+       }
+}
+
+func SortServicesByCreationTime(services []*Service) []*Service {
+       slices.SortStableFunc(services, func(i, j *Service) int {
+               if r := i.CreationTime.Compare(j.CreationTime); r != 0 {
+                       return r
+               }
+               // If creation time is the same, then behavior is 
nondeterministic. In this case, we can
+               // pick an arbitrary but consistent ordering based on name and 
namespace, which is unique.
+               // CreationTimestamp is stored in seconds, so this is not 
uncommon.
+               if r := cmp.Compare(i.Attributes.Name, j.Attributes.Name); r != 
0 {
+                       return r
+               }
+               return cmp.Compare(i.Attributes.Namespace, 
j.Attributes.Namespace)
+       })
+       return services
+}
+
+func resolveServiceAliases(allServices []*Service, configsUpdated 
sets.Set[ConfigKey]) {
+       // rawAlias builds a map of Service -> AliasFor. So this will be 
ExternalName -> Service.
+       // In an edge case, we can have ExternalName -> ExternalName; we 
resolve that below.
+       rawAlias := map[NamespacedHostname]host.Name{}
+       for _, s := range allServices {
+               if s.Resolution != Alias {
+                       continue
+               }
+               nh := NamespacedHostname{
+                       Hostname:  s.Hostname,
+                       Namespace: s.Attributes.Namespace,
+               }
+               rawAlias[nh] = 
host.Name(s.Attributes.K8sAttributes.ExternalName)
+       }
+
+       // unnamespacedRawAlias is like rawAlias but without namespaces.
+       // This is because an `ExternalName` isn't namespaced. If there is a 
conflict, the behavior is undefined.
+       // This is split from above as a minor optimization to right-size the 
map
+       unnamespacedRawAlias := make(map[host.Name]host.Name, len(rawAlias))
+       for k, v := range rawAlias {
+               unnamespacedRawAlias[k.Hostname] = v
+       }
+
+       // resolvedAliases builds a map of Alias -> Concrete, fully resolving 
through multiple hops.
+       // Ex: Alias1 -> Alias2 -> Concrete will flatten to Alias1 -> Concrete.
+       resolvedAliases := make(map[NamespacedHostname]host.Name, len(rawAlias))
+       for alias, referencedService := range rawAlias {
+               // referencedService may be another alias or a concrete service.
+               if _, f := unnamespacedRawAlias[referencedService]; !f {
+                       // Common case: alias pointing to a concrete service
+                       resolvedAliases[alias] = referencedService
+                       continue
+               }
+               // Otherwise, we need to traverse the alias "graph".
+               // In an obscure edge case, a user could make a loop, so we 
will need to handle that.
+               seen := sets.New(alias.Hostname, referencedService)
+               for {
+                       n, f := unnamespacedRawAlias[referencedService]
+                       if !f {
+                               // The destination we are pointing to is not an 
alias, so this is the terminal step
+                               resolvedAliases[alias] = referencedService
+                               break
+                       }
+                       if seen.InsertContains(n) {
+                               // We did a loop!
+                               // Kubernetes will make these NXDomain, so we 
can just treat it like it doesn't exist at all
+                               break
+                       }
+                       referencedService = n
+               }
+       }
+
+       // aliasesForService builds a map of Concrete -> []Aliases
+       // This basically reverses our resolvedAliased map, which is Alias -> 
Concrete,
+       aliasesForService := map[host.Name][]NamespacedHostname{}
+       for alias, concrete := range resolvedAliases {
+               aliasesForService[concrete] = 
append(aliasesForService[concrete], alias)
+
+               // We also need to update configsUpdated, such that any "alias" 
updated also marks the concrete service as updated.
+               aliasKey := ConfigKey{
+                       Kind:      kind.ServiceEntry,
+                       Name:      alias.Hostname.String(),
+                       Namespace: alias.Namespace,
+               }
+               // Alias. We should mark all the concrete services as updated 
as well.
+               if configsUpdated.Contains(aliasKey) {
+                       // We only have the hostname, but we need the 
namespace...
+                       for _, svc := range allServices {
+                               if svc.Hostname == concrete {
+                                       configsUpdated.Insert(ConfigKey{
+                                               Kind:      kind.ServiceEntry,
+                                               Name:      concrete.String(),
+                                               Namespace: 
svc.Attributes.Namespace,
+                                       })
+                               }
+                       }
+               }
+       }
+       // Sort aliases so order is deterministic.
+       for _, v := range aliasesForService {
+               slices.SortFunc(v, func(a, b NamespacedHostname) int {
+                       if r := cmp.Compare(a.Namespace, b.Namespace); r != 0 {
+                               return r
+                       }
+                       return cmp.Compare(a.Hostname, b.Hostname)
+               })
+       }
+
+       // Finally, we can traverse all services and update the ones that have 
aliases
+       for i, s := range allServices {
+               if aliases, f := aliasesForService[s.Hostname]; f {
+                       // This service has an alias; set it. We need to make a 
copy since the underlying Service is shared
+                       s = s.DeepCopy()
+                       s.Attributes.Aliases = aliases
+                       allServices[i] = s
+               }
+       }
+}
+
 func NewReasonStats(reasons ...TriggerReason) ReasonStats {
        ret := make(ReasonStats)
        for _, reason := range reasons {
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 004793cd..0c69d941 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -11,10 +11,29 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
        "github.com/google/go-cmp/cmp"
+       "istio.io/api/annotation"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/klog/v2"
+       "strings"
        "sync"
        "time"
 )
 
+type Resolution int
+
+const (
+       // ClientSideLB implies that the proxy will decide the endpoint from 
its local lb pool
+       ClientSideLB Resolution = iota
+       // DNSLB implies that the proxy will resolve a DNS address and forward 
to the resolved address
+       DNSLB
+       // Passthrough implies that the proxy should forward traffic to the 
destination IP requested by the caller
+       Passthrough
+       // DNSRoundRobinLB implies that the proxy will resolve a DNS address 
and forward to the resolved address
+       DNSRoundRobinLB
+       // Alias defines a Service that is an alias for another.
+       Alias
+)
+
 type EndpointDiscoverabilityPolicy interface {
        String() string
 }
@@ -61,6 +80,7 @@ type DubboEndpoint struct {
        HealthStatus           HealthStatus
        SendUnhealthyEndpoints bool
        DiscoverabilityPolicy  EndpointDiscoverabilityPolicy `json:"-"`
+       LegacyClusterPortKey   int
 }
 
 func (ep *DubboEndpoint) FirstAddressOrNil() string {
@@ -132,6 +152,66 @@ type ServiceAttributes struct {
        // Namespace is "destination.service.namespace" attribute
        Namespace       string
        ServiceRegistry provider.ID
+       K8sAttributes
+}
+
+type K8sAttributes struct {
+       // Type holds the value of the corev1.Type of the Kubernetes service
+       // spec.Type
+       Type string
+
+       // spec.ExternalName
+       ExternalName string
+
+       // NodeLocal means the proxy will only forward traffic to node local 
endpoints
+       // spec.InternalTrafficPolicy == Local
+       NodeLocal bool
+
+       // TrafficDistribution determines the service-level traffic 
distribution.
+       // This may be overridden by locality load balancing settings.
+       TrafficDistribution TrafficDistribution
+
+       // ObjectName is the object name of the underlying object. This may 
differ from the Service.Attributes.Name for legacy semantics.
+       ObjectName string
+
+       // spec.PublishNotReadyAddresses
+       PublishNotReadyAddresses bool
+}
+
+type TrafficDistribution int
+
+const (
+       // TrafficDistributionAny allows any destination
+       TrafficDistributionAny TrafficDistribution = iota
+       // TrafficDistributionPreferPreferSameZone prefers traffic in same 
zone, failing over to same region and then network.
+       TrafficDistributionPreferSameZone
+       // TrafficDistributionPreferNode prefers traffic in same node, failing 
over to same subzone, then zone, region, and network.
+       TrafficDistributionPreferSameNode
+)
+
+func GetTrafficDistribution(specValue *string, annotations map[string]string) 
TrafficDistribution {
+       if specValue != nil {
+               switch *specValue {
+               case corev1.ServiceTrafficDistributionPreferSameZone, 
corev1.ServiceTrafficDistributionPreferClose:
+                       return TrafficDistributionPreferSameZone
+               case corev1.ServiceTrafficDistributionPreferSameNode:
+                       return TrafficDistributionPreferSameNode
+               }
+       }
+       // The TrafficDistribution field is quite new, so we allow a legacy 
annotation option as well
+       // This also has some custom types
+       trafficDistributionAnnotationValue := 
strings.ToLower(annotations[annotation.NetworkingTrafficDistribution.Name])
+       switch trafficDistributionAnnotationValue {
+       case strings.ToLower(corev1.ServiceTrafficDistributionPreferClose), 
strings.ToLower(corev1.ServiceTrafficDistributionPreferSameZone):
+               return TrafficDistributionPreferSameZone
+       case strings.ToLower(corev1.ServiceTrafficDistributionPreferSameNode):
+               return TrafficDistributionPreferSameNode
+       default:
+               if trafficDistributionAnnotationValue != "" {
+                       klog.Warningf("Unknown traffic distribution annotation, 
defaulting to any")
+               }
+               return TrafficDistributionAny
+       }
 }
 
 type AddressMap struct {
@@ -151,12 +231,18 @@ func (m *AddressMap) DeepCopy() *AddressMap {
 }
 
 type Service struct {
-       Attributes      ServiceAttributes
-       Hostname        host.Name  `json:"hostname"`
-       Ports           PortList   `json:"ports,omitempty"`
-       ServiceAccounts []string   `json:"serviceAccounts,omitempty"`
-       ClusterVIPs     AddressMap `json:"clusterVIPs,omitempty"`
-       CreationTime    time.Time  `json:"creationTime,omitempty"`
+       Attributes               ServiceAttributes
+       Hostname                 host.Name  `json:"hostname"`
+       Ports                    PortList   `json:"ports,omitempty"`
+       ServiceAccounts          []string   `json:"serviceAccounts,omitempty"`
+       ClusterVIPs              AddressMap `json:"clusterVIPs,omitempty"`
+       CreationTime             time.Time  `json:"creationTime,omitempty"`
+       DefaultAddress           string     `json:"defaultAddress,omitempty"`
+       ResourceVersion          string
+       Resolution               Resolution
+       AutoAllocatedIPv4Address string 
`json:"autoAllocatedIPv4Address,omitempty"`
+       AutoAllocatedIPv6Address string 
`json:"autoAllocatedIPv6Address,omitempty"`
+       MeshExternal             bool
 }
 
 func (s *Service) DeepCopy() *Service {
@@ -247,3 +333,83 @@ func (s *ServiceAttributes) DeepCopy() ServiceAttributes {
        // nolint: govet
        return out
 }
+
+func (s *Service) Equals(other *Service) bool {
+       if s == nil {
+               return other == nil
+       }
+       if other == nil {
+               return s == nil
+       }
+
+       if !s.Attributes.Equals(&other.Attributes) {
+               return false
+       }
+
+       if !s.Ports.Equals(other.Ports) {
+               return false
+       }
+       if !slices.Equal(s.ServiceAccounts, other.ServiceAccounts) {
+               return false
+       }
+
+       if len(s.ClusterVIPs.Addresses) != len(other.ClusterVIPs.Addresses) {
+               return false
+       }
+       for k, v1 := range s.ClusterVIPs.Addresses {
+               if v2, ok := other.ClusterVIPs.Addresses[k]; !ok || 
!slices.Equal(v1, v2) {
+                       return false
+               }
+       }
+
+       return s.DefaultAddress == other.DefaultAddress && 
s.AutoAllocatedIPv4Address == other.AutoAllocatedIPv4Address &&
+               s.AutoAllocatedIPv6Address == other.AutoAllocatedIPv6Address && 
s.Hostname == other.Hostname &&
+               s.Resolution == other.Resolution && s.MeshExternal == 
other.MeshExternal
+}
+
+func (s *ServiceAttributes) Equals(other *ServiceAttributes) bool {
+       if s == nil {
+               return other == nil
+       }
+       if other == nil {
+               return s == nil
+       }
+
+       if !maps.Equal(s.Labels, other.Labels) {
+               return false
+       }
+
+       if !maps.Equal(s.LabelSelectors, other.LabelSelectors) {
+               return false
+       }
+
+       if !maps.Equal(s.ExportTo, other.ExportTo) {
+               return false
+       }
+
+       if !slices.Equal(s.Aliases, other.Aliases) {
+               return false
+       }
+
+       if s.ClusterExternalAddresses.Len() != 
other.ClusterExternalAddresses.Len() {
+               return false
+       }
+
+       for k, v1 := range s.ClusterExternalAddresses.GetAddresses() {
+               if v2, ok := other.ClusterExternalAddresses.Addresses[k]; !ok 
|| !slices.Equal(v1, v2) {
+                       return false
+               }
+       }
+
+       if len(s.ClusterExternalPorts) != len(other.ClusterExternalPorts) {
+               return false
+       }
+
+       for k, v1 := range s.ClusterExternalPorts {
+               if v2, ok := s.ClusterExternalPorts[k]; !ok || !maps.Equal(v1, 
v2) {
+                       return false
+               }
+       }
+       return s.Name == other.Name && s.Namespace == other.Namespace &&
+               s.ServiceRegistry == other.ServiceRegistry && s.K8sAttributes 
== other.K8sAttributes
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go 
b/sail/pkg/serviceregistry/kube/controller/controller.go
index 3dd79905..839ad194 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -22,14 +22,22 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+       "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
        kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
        "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+       "github.com/apache/dubbo-kubernetes/pkg/ptr"
        "github.com/apache/dubbo-kubernetes/pkg/queue"
+       "github.com/apache/dubbo-kubernetes/pkg/slices"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
        "go.uber.org/atomic"
+       v1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/klog/v2"
        "sort"
        "sync"
@@ -47,6 +55,28 @@ type Controller struct {
        servicesMap         map[host.Name]*model.Service
        queue               queue.Instance
        initialSyncTimedout *atomic.Bool
+       configCluster       bool
+       services            kclient.Client[*v1.Service]
+       endpoints           *endpointSliceController
+       meshWatcher         mesh.Watcher
+       handlers            model.ControllerHandlers
+}
+
+func NewController(kubeClient kubelib.Client, options Options) *Controller {
+       c := &Controller{
+               opts:                options,
+               client:              kubeClient,
+               queue:               queue.NewQueueWithID(1*time.Second, 
string(options.ClusterID)),
+               servicesMap:         make(map[host.Name]*model.Service),
+               initialSyncTimedout: atomic.NewBool(false),
+
+               configCluster: options.ConfigCluster,
+       }
+       c.services = kclient.NewFiltered[*v1.Service](kubeClient, 
kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
+       registerHandlers(c, c.services, "Services", c.onServiceEvent, nil)
+       c.endpoints = newEndpointSliceController(c)
+       c.meshWatcher = options.MeshWatcher
+       return c
 }
 
 type Options struct {
@@ -63,6 +93,7 @@ type Options struct {
        KrtDebugger           *krt.DebugHandler
        SyncTimeout           time.Duration
        Revision              string
+       ConfigCluster         bool
 }
 
 func (c *Controller) Services() []*model.Service {
@@ -84,6 +115,79 @@ func (c *Controller) GetService(hostname host.Name) 
*model.Service {
        return svc
 }
 
+func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event) 
error {
+       klog.V(2).Infof("Handle event %s for service %s in namespace %s", 
event, curr.Name, curr.Namespace)
+
+       // Create the standard (cluster.local) service.
+       svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster(), 
c.meshWatcher.Mesh())
+
+       switch event {
+       case model.EventDelete:
+               c.deleteService(svcConv)
+       default:
+               c.addOrUpdateService(pre, curr, svcConv, event, false)
+       }
+
+       return nil
+}
+
+func (c *Controller) deleteService(svc *model.Service) {
+       c.Lock()
+       delete(c.servicesMap, svc.Hostname)
+       c.Unlock()
+
+       shard := model.ShardKeyFromRegistry(c)
+       event := model.EventDelete
+       c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname), 
svc.Attributes.Namespace, event)
+       if !svc.Attributes.ExportTo.Contains(visibility.None) {
+               c.handlers.NotifyServiceHandlers(nil, svc, event)
+       }
+}
+
+func (c *Controller) addOrUpdateService(pre, curr *v1.Service, currConv 
*model.Service, event model.Event, updateEDSCache bool) {
+       c.Lock()
+       prevConv := c.servicesMap[currConv.Hostname]
+       c.servicesMap[currConv.Hostname] = currConv
+       c.Unlock()
+       // This full push needed to update all endpoints, even though we do a 
full push on service add/update
+       // as that full push is only triggered for the specific service.
+
+       shard := model.ShardKeyFromRegistry(c)
+       ns := currConv.Attributes.Namespace
+
+       c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
+       if serviceUpdateNeedsPush(pre, curr, prevConv, currConv) {
+               klog.V(2).Infof("Service %s in namespace %s updated and needs 
push", currConv.Hostname, ns)
+               c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
+       }
+}
+
+func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv 
*model.Service) bool {
+       // New Service - If it is not exported, no need to push.
+       if preConv == nil {
+               return !currConv.Attributes.ExportTo.Contains(visibility.None)
+       }
+       // if service Visibility is None and has not changed in the 
update/delete, no need to push.
+       if preConv.Attributes.ExportTo.Contains(visibility.None) &&
+               currConv.Attributes.ExportTo.Contains(visibility.None) {
+               return false
+       }
+       // Check if there are any changes we care about by comparing 
`model.Service`s
+       if !preConv.Equals(currConv) {
+               return true
+       }
+       // Also check if target ports are changed since they are not included 
in `model.Service`
+       // `preConv.Equals(currConv)` already makes sure the length of ports is 
not changed
+       if prev != nil && curr != nil {
+               if !slices.EqualFunc(prev.Spec.Ports, curr.Spec.Ports, func(a, 
b v1.ServicePort) bool {
+                       return a.TargetPort == b.TargetPort
+               }) {
+                       return true
+               }
+       }
+       return false
+}
+
 func (c *Controller) Provider() provider.ID {
        return provider.Kubernetes
 }
@@ -121,3 +225,51 @@ func (c *Controller) HasSynced() bool {
 func (c *Controller) informersSynced() bool {
        return true
 }
+
+type FilterOutFunc[T controllers.Object] func(old, cur T) bool
+
+func registerHandlers[T controllers.ComparableObject](c *Controller,
+       informer kclient.Informer[T], otype string,
+       handler func(T, T, model.Event) error, filter FilterOutFunc[T],
+) {
+       wrappedHandler := func(prev, curr T, event model.Event) error {
+               curr = informer.Get(curr.GetName(), curr.GetNamespace())
+               if controllers.IsNil(curr) {
+                       // this can happen when an immediate delete after update
+                       // the delete event can be handled later
+                       return nil
+               }
+               return handler(prev, curr, event)
+       }
+
+       informer.AddEventHandler(
+               controllers.EventHandler[T]{
+                       AddFunc: func(obj T) {
+                               c.queue.Push(func() error {
+                                       return wrappedHandler(ptr.Empty[T](), 
obj, model.EventAdd)
+                               })
+                       },
+                       UpdateFunc: func(old, cur T) {
+                               if filter != nil {
+                                       if filter(old, cur) {
+                                               return
+                                       }
+                               }
+                               c.queue.Push(func() error {
+                                       return wrappedHandler(old, cur, 
model.EventUpdate)
+                               })
+                       },
+                       DeleteFunc: func(obj T) {
+                               c.queue.Push(func() error {
+                                       return handler(ptr.Empty[T](), obj, 
model.EventDelete)
+                               })
+                       },
+               })
+}
+
+func (c *Controller) servicesForNamespacedName(name types.NamespacedName) 
[]*model.Service {
+       if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace, 
c.opts.DomainSuffix)); svc != nil {
+               return []*model.Service{svc}
+       }
+       return nil
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go 
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
new file mode 100644
index 00000000..3e240cc9
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -0,0 +1,120 @@
+package controller
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+       "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "istio.io/api/annotation"
+       corev1 "k8s.io/api/core/v1"
+       v1 "k8s.io/api/discovery/v1"
+       klabels "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/apimachinery/pkg/selection"
+       mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
+       "strings"
+)
+
+var (
+       endpointSliceRequirement = labelRequirement(mcs.LabelServiceName, 
selection.DoesNotExist, nil)
+       endpointSliceSelector    = 
klabels.NewSelector().Add(*endpointSliceRequirement)
+)
+
+type endpointSliceController struct {
+       slices kclient.Client[*v1.EndpointSlice]
+       c      *Controller
+}
+
+func newEndpointSliceController(c *Controller) *endpointSliceController {
+       slices := kclient.NewFiltered[*v1.EndpointSlice](c.client, 
kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
+       out := &endpointSliceController{
+               c:      c,
+               slices: slices,
+       }
+       registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice", 
out.onEvent, nil)
+       return out
+}
+
+func (esc *endpointSliceController) onEvent(_, ep *v1.EndpointSlice, event 
model.Event) error {
+       esc.onEventInternal(nil, ep, event)
+       return nil
+}
+
+func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, 
event model.Event) {
+       esLabels := ep.GetLabels()
+       if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
+               return
+       }
+       // Update internal endpoint cache no matter what kind of service, even 
headless service.
+       // As for gateways, the cluster discovery type is `EDS` for headless 
service.
+       // namespacedName := getServiceNamespacedName(ep)
+       // log.Debugf("Handle EDS endpoint %s %s in namespace %s", 
namespacedName.Name, event, namespacedName.Namespace)
+       // if event == model.EventDelete {
+       //      esc.deleteEndpointSlice(ep)
+       // } else {
+       //      esc.updateEndpointSlice(ep)
+       // }
+
+       // Now check if we need to do a full push for the service.
+       // If the service is headless, we need to do a full push if service 
exposes TCP ports
+       // to create IP based listeners. For pure HTTP headless services, we 
only need to push NDS.
+       name := serviceNameForEndpointSlice(esLabels)
+       namespace := ep.GetNamespace()
+       svc := esc.c.services.Get(name, namespace)
+       if svc != nil && !serviceNeedsPush(svc) {
+               return
+       }
+
+       // hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
+       // log.Debugf("triggering EDS push for %s in namespace %s", hostnames, 
namespacedName.Namespace)
+       // Trigger EDS push for all hostnames.
+       // esc.pushEDS(hostnames, namespacedName.Namespace)
+
+       if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone || 
svc.Spec.Type == corev1.ServiceTypeExternalName {
+               return
+       }
+
+       configsUpdated := sets.New[model.ConfigKey]()
+       supportsOnlyHTTP := true
+       for _, modelSvc := range 
esc.c.servicesForNamespacedName(config.NamespacedName(svc)) {
+               for _, p := range modelSvc.Ports {
+                       if !p.Protocol.IsHTTP() {
+                               supportsOnlyHTTP = false
+                               break
+                       }
+               }
+               if supportsOnlyHTTP {
+                       // pure HTTP headless services should not need a full 
push since they do not
+                       // require a Listener based on IP: 
https://github.com/istio/istio/issues/48207
+                       configsUpdated.Insert(model.ConfigKey{Kind: 
kind.DNSName, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
+               } else {
+                       configsUpdated.Insert(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
+               }
+       }
+
+       if len(configsUpdated) > 0 {
+               esc.c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
+                       Full:           true,
+                       ConfigsUpdated: configsUpdated,
+                       Reason:         
model.NewReasonStats(model.HeadlessEndpointUpdate),
+               })
+       }
+}
+
+func serviceNameForEndpointSlice(labels map[string]string) string {
+       return labels[v1.LabelServiceName]
+}
+
+func serviceNeedsPush(svc *corev1.Service) bool {
+       if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
+               namespaces := 
strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
+               for _, ns := range namespaces {
+                       ns = strings.TrimSpace(ns)
+                       if ns == string(visibility.None) {
+                               return false
+                       }
+               }
+       }
+       return true
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/multicluster.go 
b/sail/pkg/serviceregistry/kube/controller/multicluster.go
new file mode 100644
index 00000000..cad7a102
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/multicluster.go
@@ -0,0 +1,94 @@
+package controller
+
+import (
+       kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/server"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+       
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
+       "k8s.io/klog/v2"
+)
+
+type kubeController struct {
+       MeshServiceController *aggregate.Controller
+       *Controller
+       workloadEntryController *serviceentry.Controller
+       stop                    chan struct{}
+}
+
+func (k *kubeController) Close() {
+       close(k.stop)
+}
+
+type Multicluster struct {
+       // serverID of this pilot instance used for leader election
+       serverID string
+
+       // options to use when creating kube controllers
+       opts Options
+
+       s server.Instance
+
+       clusterLocal model.ClusterLocalProvider
+
+       distributeCACert bool
+       caBundleWatcher  *keycertbundle.Watcher
+       revision         string
+
+       component *multicluster.Component[*kubeController]
+}
+
+func NewMulticluster(
+       serverID string,
+       opts Options,
+       caBundleWatcher *keycertbundle.Watcher,
+       revision string,
+       distributeCACert bool,
+       clusterLocal model.ClusterLocalProvider,
+       s server.Instance,
+       controller *multicluster.Controller,
+) *Multicluster {
+       mc := &Multicluster{
+               serverID:         serverID,
+               opts:             opts,
+               distributeCACert: distributeCACert,
+               caBundleWatcher:  caBundleWatcher,
+               revision:         revision,
+               clusterLocal:     clusterLocal,
+               s:                s,
+       }
+       mc.component = multicluster.BuildMultiClusterComponent(controller, 
func(cluster *multicluster.Cluster) *kubeController {
+               stop := make(chan struct{})
+               client := cluster.Client
+               configCluster := opts.ClusterID == cluster.ID
+
+               options := opts
+               options.ClusterID = cluster.ID
+               klog.Infof("Initializing Kubernetes service registry %q", 
options.ClusterID)
+               options.ConfigCluster = configCluster
+               kubeRegistry := NewController(client, options)
+               kubeController := &kubeController{
+                       MeshServiceController: opts.MeshServiceController,
+                       Controller:            kubeRegistry,
+                       stop:                  stop,
+               }
+               mc.initializeCluster(cluster, kubeController, kubeRegistry, 
options, configCluster, stop)
+               return kubeController
+       })
+
+       return mc
+}
+
+func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, 
kubeController *kubeController, kubeRegistry *Controller,
+       options Options, configCluster bool, clusterStopCh <-chan struct{},
+) {
+       // run after WorkloadHandler is added
+       m.opts.MeshServiceController.AddRegistryAndRun(kubeRegistry, 
clusterStopCh)
+}
+
+func (m *Multicluster) checkShouldLead(client kubelib.Client, systemNamespace 
string, stop <-chan struct{}) bool {
+       var res bool
+       return res
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/util.go 
b/sail/pkg/serviceregistry/kube/controller/util.go
new file mode 100644
index 00000000..f8c5ecaf
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/util.go
@@ -0,0 +1,16 @@
+package controller
+
+import (
+       "fmt"
+       klabels "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/apimachinery/pkg/selection"
+       "k8s.io/apimachinery/pkg/util/validation/field"
+)
+
+func labelRequirement(key string, op selection.Operator, vals []string, opts 
...field.PathOption) *klabels.Requirement {
+       out, err := klabels.NewRequirement(key, op, vals, opts...)
+       if err != nil {
+               panic(fmt.Sprintf("failed creating requirements for Service: 
%v", err))
+       }
+       return out
+}
diff --git a/sail/pkg/serviceregistry/kube/conversion.go 
b/sail/pkg/serviceregistry/kube/conversion.go
new file mode 100644
index 00000000..1d616bd1
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/conversion.go
@@ -0,0 +1,152 @@
+package kube
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+       "github.com/apache/dubbo-kubernetes/pkg/config/host"
+       "github.com/apache/dubbo-kubernetes/pkg/config/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+       "github.com/apache/dubbo-kubernetes/pkg/spiffe"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+       "istio.io/api/annotation"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       corev1 "k8s.io/api/core/v1"
+       "strings"
+)
+
+func ServiceHostname(name, namespace, domainSuffix string) host.Name {
+       return host.Name(name + "." + namespace + "." + "svc" + "." + 
domainSuffix) // Format: "%s.%s.svc.%s"
+}
+
+func ConvertService(svc corev1.Service, domainSuffix string, clusterID 
cluster.ID, mesh *meshconfig.MeshConfig) *model.Service {
+       addrs := []string{constants.UnspecifiedIP}
+       resolution := model.ClientSideLB
+       externalName := ""
+       nodeLocal := false
+
+       if svc.Spec.Type == corev1.ServiceTypeExternalName && 
svc.Spec.ExternalName != "" {
+               externalName = svc.Spec.ExternalName
+               resolution = model.Alias
+       }
+
+       if svc.Spec.InternalTrafficPolicy != nil && 
*svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal {
+               nodeLocal = true
+       }
+
+       if svc.Spec.ClusterIP == corev1.ClusterIPNone { // headless services 
should not be load balanced
+               resolution = model.Passthrough
+       } else if svc.Spec.ClusterIP != "" {
+               addrs[0] = svc.Spec.ClusterIP
+               if len(svc.Spec.ClusterIPs) > 1 {
+                       addrs = svc.Spec.ClusterIPs
+               }
+       }
+
+       ports := make([]*model.Port, 0, len(svc.Spec.Ports))
+       for _, port := range svc.Spec.Ports {
+               ports = append(ports, convertPort(port))
+       }
+       var exportTo sets.Set[visibility.Instance]
+       serviceaccounts := make([]string, 0)
+       if svc.Annotations[annotation.AlphaCanonicalServiceAccounts.Name] != "" 
{
+               serviceaccounts = append(serviceaccounts, 
strings.Split(svc.Annotations[annotation.AlphaCanonicalServiceAccounts.Name], 
",")...)
+       }
+       if svc.Annotations[annotation.AlphaKubernetesServiceAccounts.Name] != 
"" {
+               for _, ksa := range 
strings.Split(svc.Annotations[annotation.AlphaKubernetesServiceAccounts.Name], 
",") {
+                       serviceaccounts = append(serviceaccounts, 
kubeToDubboServiceAccount(ksa, svc.Namespace, mesh))
+               }
+       }
+       if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
+               namespaces := 
strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
+               exportTo = 
sets.NewWithLength[visibility.Instance](len(namespaces))
+               for _, ns := range namespaces {
+                       ns = strings.TrimSpace(ns)
+                       exportTo.Insert(visibility.Instance(ns))
+               }
+       }
+
+       dubboService := &model.Service{
+               Hostname: ServiceHostname(svc.Name, svc.Namespace, 
domainSuffix),
+               ClusterVIPs: model.AddressMap{
+                       Addresses: map[cluster.ID][]string{
+                               clusterID: addrs,
+                       },
+               },
+               Ports:           ports,
+               DefaultAddress:  addrs[0],
+               Resolution:      resolution,
+               ServiceAccounts: serviceaccounts,
+               ResourceVersion: svc.ResourceVersion,
+               Attributes: model.ServiceAttributes{
+                       ServiceRegistry: provider.Kubernetes,
+                       Name:            svc.Name,
+                       Namespace:       svc.Namespace,
+                       Labels:          svc.Labels,
+                       ExportTo:        exportTo,
+                       LabelSelectors:  svc.Spec.Selector,
+               },
+       }
+
+       switch svc.Spec.Type {
+       case corev1.ServiceTypeNodePort:
+               if _, ok := 
svc.Annotations[annotation.TrafficNodeSelector.Name]; !ok {
+                       break
+               }
+               // store the service port to node port mappings
+               portMap := make(map[uint32]uint32)
+               for _, p := range svc.Spec.Ports {
+                       portMap[uint32(p.Port)] = uint32(p.NodePort)
+               }
+               dubboService.Attributes.ClusterExternalPorts = 
map[cluster.ID]map[uint32]uint32{clusterID: portMap}
+               // address mappings will be done elsewhere
+       case corev1.ServiceTypeLoadBalancer:
+               if len(svc.Status.LoadBalancer.Ingress) > 0 {
+                       var lbAddrs []string
+                       for _, ingress := range svc.Status.LoadBalancer.Ingress 
{
+                               if len(ingress.IP) > 0 {
+                                       lbAddrs = append(lbAddrs, ingress.IP)
+                               } else if len(ingress.Hostname) > 0 {
+                                       // DO NOT resolve the DNS here. In 
environments like AWS, the ELB hostname
+                                       // does not have a repeatable DNS 
address and IPs resolved at an earlier point
+                                       // in time may not work. So, when we 
get just hostnames instead of IPs, we need
+                                       // to smartly switch from EDS to 
strict_dns rather than doing the naive thing of
+                                       // resolving the DNS name and hoping 
the resolution is one-time task.
+                                       lbAddrs = append(lbAddrs, 
ingress.Hostname)
+                               }
+                       }
+                       if len(lbAddrs) > 0 {
+                               if 
dubboService.Attributes.ClusterExternalAddresses == nil {
+                                       
dubboService.Attributes.ClusterExternalAddresses = &model.AddressMap{}
+                               }
+                               
dubboService.Attributes.ClusterExternalAddresses.SetAddressesFor(clusterID, 
lbAddrs)
+                       }
+               }
+       }
+
+       dubboService.Attributes.Type = string(svc.Spec.Type)
+       dubboService.Attributes.ExternalName = externalName
+       dubboService.Attributes.TrafficDistribution = 
model.GetTrafficDistribution(svc.Spec.TrafficDistribution, svc.Annotations)
+       dubboService.Attributes.NodeLocal = nodeLocal
+       dubboService.Attributes.PublishNotReadyAddresses = 
svc.Spec.PublishNotReadyAddresses
+       if len(svc.Spec.ExternalIPs) > 0 {
+               if dubboService.Attributes.ClusterExternalAddresses == nil {
+                       dubboService.Attributes.ClusterExternalAddresses = 
&model.AddressMap{}
+               }
+               
dubboService.Attributes.ClusterExternalAddresses.AddAddressesFor(clusterID, 
svc.Spec.ExternalIPs)
+       }
+       return dubboService
+}
+
+func convertPort(port corev1.ServicePort) *model.Port {
+       return &model.Port{
+               Name:     port.Name,
+               Port:     int(port.Port),
+               Protocol: kube.ConvertProtocol(port.Port, port.Name, 
port.Protocol, port.AppProtocol),
+       }
+}
+
+func kubeToDubboServiceAccount(saname string, ns string, mesh 
*meshconfig.MeshConfig) string {
+       return spiffe.MustGenSpiffeURI(mesh, ns, saname)
+}
diff --git a/sail/pkg/serviceregistry/serviceentry/controller.go 
b/sail/pkg/serviceregistry/serviceentry/controller.go
new file mode 100644
index 00000000..5d116e7c
--- /dev/null
+++ b/sail/pkg/serviceregistry/serviceentry/controller.go
@@ -0,0 +1,4 @@
+package serviceentry
+
+type Controller struct {
+}
diff --git a/sail/pkg/xds/eds.go b/sail/pkg/xds/eds.go
index 811dacd7..d2afda10 100644
--- a/sail/pkg/xds/eds.go
+++ b/sail/pkg/xds/eds.go
@@ -1,23 +1,12 @@
 package xds
 
-import (
-       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
-       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
-       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
-)
+import "github.com/apache/dubbo-kubernetes/sail/pkg/model"
 
-// TODO EDS
-func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string, 
namespace string,
-       dubboEndpoints []*model.DubboEndpoint,
-) {
-       // Update the endpoint shards
-       pushType := s.Env.EndpointIndex.UpdateServiceEndpoints(shard, 
serviceName, namespace, dubboEndpoints, true)
-       if pushType == model.IncrementalPush || pushType == model.FullPush {
-               // Trigger a push
-               s.ConfigUpdate(&model.PushRequest{
-                       Full:           pushType == model.FullPush,
-                       ConfigsUpdated: sets.New(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
-                       Reason:         
model.NewReasonStats(model.EndpointUpdate),
-               })
+func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey, hostname string, 
namespace string, event model.Event) {
+       // When a service deleted, we should cleanup the endpoint shards and 
also remove keys from EndpointIndex to
+       // prevent memory leaks.
+       if event == model.EventDelete {
+               s.Env.EndpointIndex.DeleteServiceShard(shard, hostname, 
namespace, false)
+       } else {
        }
 }

Reply via email to