This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new e01c5471 [discovery] full push service xds update (#803)
e01c5471 is described below

commit e01c54719e31e9f602835909b4d86175ed106374
Author: Jian Zhong <[email protected]>
AuthorDate: Thu Oct 16 15:26:38 2025 +0800

    [discovery] full push service xds update (#803)
---
 Dockerfile                                         |   2 +-
 pkg/config/model.go                                |   5 +-
 pkg/config/schema/gvk/resources.go                 |   3 +
 pkg/config/schema/gvr/resources.go                 |   3 +
 pkg/config/schema/kubeclient/resources.go          |  11 +-
 pkg/config/schema/kubetypes/resources.go           |   2 +
 pkg/kube/inject/inject.go                          | 236 ++++++++++++++++
 pkg/kube/inject/watcher.go                         |  14 +-
 pkg/kube/inject/webhook.go                         |  73 ++++-
 pkg/kube/util.go                                   |  35 +++
 pkg/webhooks/validation/controller/controller.go   |   3 +-
 pkg/webhooks/webhookpatch.go                       |   3 +-
 sail/pkg/bootstrap/server.go                       |   2 -
 sail/pkg/model/push_context.go                     |   8 +-
 sail/pkg/model/service.go                          |  28 +-
 .../serviceregistry/kube/controller/controller.go  | 154 ++++++++++-
 .../kube/controller/endpoint_builder.go            |  97 +++++++
 .../kube/controller/endpointslice.go               | 296 +++++++++++++++++++--
 .../kube/controller/multicluster.go                |   4 +-
 .../pkg/serviceregistry/kube/controller/network.go |  56 ++++
 sail/pkg/serviceregistry/kube/controller/pod.go    | 252 ++++++++++++++++++
 sail/pkg/serviceregistry/kube/controller/util.go   |  13 +
 sail/pkg/serviceregistry/kube/conversion.go        |   4 +
 .../pkg/serviceregistry/serviceentry/controller.go |   4 -
 sail/pkg/serviceregistry/util/label/label.go       |  60 +++++
 sail/pkg/xds/ads.go                                |   6 +-
 sail/pkg/xds/discovery.go                          |  40 +++
 sail/pkg/xds/eds.go                                |  30 ++-
 sail/pkg/xds/xdsgen.go                             |   1 +
 29 files changed, 1364 insertions(+), 81 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 63b52fd0..d7f227e1 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -4,4 +4,4 @@ ENTRYPOINT ["./sail-discovery"]
 
 FROM gcr.io/distroless/static:debug AS agent
 COPY sail-agent .
-ENTRYPOINT ["./sail-agent"]
+ENTRYPOINT ["./sail-agent"]
\ No newline at end of file
diff --git a/pkg/config/model.go b/pkg/config/model.go
index 204feb77..71ad19d5 100644
--- a/pkg/config/model.go
+++ b/pkg/config/model.go
@@ -44,7 +44,6 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
-       "istio.io/api/label"
 )
 
 // Meta is metadata attached to each configuration unit.
@@ -132,7 +131,7 @@ func (o ObjectWithCluster[T]) GetObjectKeyable() any {
 }
 
 func LabelsInRevision(lbls map[string]string, rev string) bool {
-       configEnv, f := lbls[label.IoIstioRev.Name]
+       configEnv, f := lbls["dubbo.io/rev"]
        if !f {
                // This is a global object, and always included
                return true
@@ -150,7 +149,7 @@ func LabelsInRevisionOrTags(lbls map[string]string, rev 
string, tags sets.Set[st
        if LabelsInRevision(lbls, rev) {
                return true
        }
-       configEnv := lbls[label.IoIstioRev.Name]
+       configEnv := lbls["dubbo.io/rev"]
        // Otherwise, only return true if revisions equal
        return tags.Contains(configEnv)
 }
diff --git a/pkg/config/schema/gvk/resources.go 
b/pkg/config/schema/gvk/resources.go
index 36dc2a91..b093fc04 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -35,6 +35,7 @@ var (
        ConfigMap                      = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "ConfigMap"}
        Secret                         = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "Secret"}
        Service                        = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "Service"}
+       Pod                            = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "Pod"}
        ServiceAccount                 = config.GroupVersionKind{Group: "", 
Version: "v1", Kind: "ServiceAccount"}
        MeshConfig                     = config.GroupVersionKind{Group: "", 
Version: "v1alpha1", Kind: "MeshConfig"}
        RequestAuthentication          = config.GroupVersionKind{Group: 
"security.dubbo.io", Version: "v1", Kind: "RequestAuthentication"}
@@ -88,6 +89,8 @@ func ToGVR(g config.GroupVersionKind) 
(schema.GroupVersionResource, bool) {
                return gvr.EndpointSlice, true
        case Endpoints:
                return gvr.Endpoints, true
+       case Pod:
+               return gvr.Pod, true
        }
        return schema.GroupVersionResource{}, false
 }
diff --git a/pkg/config/schema/gvr/resources.go 
b/pkg/config/schema/gvr/resources.go
index 86f1d054..48d27b81 100644
--- a/pkg/config/schema/gvr/resources.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -42,6 +42,7 @@ var (
        VirtualService                 = schema.GroupVersionResource{Group: 
"networking.dubbo.io", Version: "v1", Resource: "virtualservices"}
        EndpointSlice                  = schema.GroupVersionResource{Group: 
"discovery.k8s.io", Version: "v1", Resource: "endpointslices"}
        Endpoints                      = schema.GroupVersionResource{Group: "", 
Version: "v1", Resource: "endpoints"}
+       Pod                            = schema.GroupVersionResource{Group: "", 
Version: "v1", Resource: "pods"}
 )
 
 func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -80,6 +81,8 @@ func IsClusterScoped(g schema.GroupVersionResource) bool {
                return false
        case Endpoints:
                return false
+       case Pod:
+               return false
        }
        return false
 }
diff --git a/pkg/config/schema/kubeclient/resources.go 
b/pkg/config/schema/kubeclient/resources.go
index ff003d02..de7912eb 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -78,7 +78,6 @@ func GetWriteClient[T runtime.Object](c ClientGetter, 
namespace string) ktypes.W
                return 
c.Dubbo().NetworkingV1().VirtualServices(namespace).(ktypes.WriteAPI[T])
        case *apiistioioapinetworkingv1.DestinationRule:
                return 
c.Dubbo().NetworkingV1().DestinationRules(namespace).(ktypes.WriteAPI[T])
-
        default:
                panic(fmt.Sprintf("Unknown type %T", ptr.Empty[T]()))
        }
@@ -108,6 +107,8 @@ func gvrToObject(g schema.GroupVersionResource) 
runtime.Object {
                return &k8sioapicorev1.ServiceAccount{}
        case gvr.StatefulSet:
                return &k8sioapiappsv1.StatefulSet{}
+       case gvr.Pod:
+               return &k8sioapicorev1.Pod{}
        case gvr.MutatingWebhookConfiguration:
                return 
&k8sioapiadmissionregistrationv1.MutatingWebhookConfiguration{}
        case gvr.ValidatingWebhookConfiguration:
@@ -120,6 +121,7 @@ func gvrToObject(g schema.GroupVersionResource) 
runtime.Object {
                return &apiistioioapinetworkingv1.VirtualService{}
        case gvr.DestinationRule:
                return &apiistioioapinetworkingv1.DestinationRule{}
+
        default:
                panic(fmt.Sprintf("Unknown type %v", g))
        }
@@ -249,6 +251,13 @@ func getInformerFiltered(c ClientGetter, opts 
ktypes.InformerOptions, g schema.G
                w = func(options metav1.ListOptions) (watch.Interface, error) {
                        return 
c.Dubbo().SecurityV1().RequestAuthentications(opts.Namespace).Watch(context.Background(),
 options)
                }
+       case gvr.Pod:
+               l = func(options metav1.ListOptions) (runtime.Object, error) {
+                       return 
c.Kube().CoreV1().Pods(opts.Namespace).List(context.Background(), options)
+               }
+               w = func(options metav1.ListOptions) (watch.Interface, error) {
+                       return 
c.Kube().CoreV1().Pods(opts.Namespace).Watch(context.Background(), options)
+               }
        default:
                panic(fmt.Sprintf("Unknown type %v", g))
        }
diff --git a/pkg/config/schema/kubetypes/resources.go 
b/pkg/config/schema/kubetypes/resources.go
index 562e8a54..bbe30e9b 100644
--- a/pkg/config/schema/kubetypes/resources.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -56,6 +56,8 @@ func getGvk(obj any) (config.GroupVersionKind, bool) {
                return gvk.Endpoints, true
        case *k8sioapicorev1.Service:
                return gvk.Service, true
+       case *k8sioapicorev1.Pod:
+               return gvk.Pod, true
        default:
                return config.GroupVersionKind{}, false
        }
diff --git a/pkg/kube/inject/inject.go b/pkg/kube/inject/inject.go
index d2042f99..0989dae8 100644
--- a/pkg/kube/inject/inject.go
+++ b/pkg/kube/inject/inject.go
@@ -1,12 +1,21 @@
 package inject
 
 import (
+       "bytes"
+       "encoding/json"
        "fmt"
        "github.com/Masterminds/sprig/v3"
+       opconfig "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
+       common_features "github.com/apache/dubbo-kubernetes/pkg/features"
+       "istio.io/api/annotation"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       proxyConfig "istio.io/api/networking/v1beta1"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/klog/v2"
        "sigs.k8s.io/yaml"
+       "strings"
        "text/template"
 )
 
@@ -24,6 +33,29 @@ type Config struct {
        Aliases             map[string][]string `json:"aliases"`
 }
 
+type ProxylessTemplateData struct {
+       TypeMeta                 metav1.TypeMeta
+       DeploymentMeta           types.NamespacedName
+       ObjectMeta               metav1.ObjectMeta
+       Spec                     corev1.PodSpec
+       ProxyConfig              *meshconfig.ProxyConfig
+       MeshConfig               *meshconfig.MeshConfig
+       Values                   map[string]any
+       Revision                 string
+       NativeSidecars           bool
+       ProxyImage               string
+       InboundTrafficPolicyMode string
+       CompliancePolicy         string
+}
+
+type ProxylessInjectionStatus struct {
+       InitContainers   []string `json:"initContainers"`
+       Containers       []string `json:"containers"`
+       Volumes          []string `json:"volumes"`
+       ImagePullSecrets []string `json:"imagePullSecrets"`
+       Revision         string   `json:"revision"`
+}
+
 func UnmarshalConfig(yml []byte) (Config, error) {
        var injectConfig Config
        if err := yaml.Unmarshal(yml, &injectConfig); err != nil {
@@ -61,5 +93,209 @@ func potentialPodName(metadata metav1.ObjectMeta) string {
 }
 
 func RunTemplate(params InjectionParameters) (mergedPod *corev1.Pod, 
templatePod *corev1.Pod, err error) {
+       meshConfig := params.meshConfig
+
+       strippedPod, err := reinsertOverrides(stripPod(params))
+       if err != nil {
+               return nil, nil, err
+       }
+
+       data := ProxylessTemplateData{
+               TypeMeta:         params.typeMeta,
+               DeploymentMeta:   params.deployMeta,
+               ObjectMeta:       strippedPod.ObjectMeta,
+               Spec:             strippedPod.Spec,
+               ProxyConfig:      params.proxyConfig,
+               MeshConfig:       meshConfig,
+               Values:           params.valuesConfig.asMap,
+               Revision:         params.revision,
+               ProxyImage:       ProxyImage(params.valuesConfig.asStruct, 
params.proxyConfig.Image, strippedPod.Annotations),
+               CompliancePolicy: common_features.CompliancePolicy,
+       }
+
+       mergedPod = params.pod
+       templatePod = &corev1.Pod{}
+
+       for _, templateName := range selectTemplates(params) {
+               parsedTemplate, f := params.templates[templateName]
+               if !f {
+                       return nil, nil, fmt.Errorf("requested template %q not 
found; have %v",
+                               templateName, 
strings.Join(knownTemplates(params.templates), ", "))
+               }
+               bbuf, err := runTemplate(parsedTemplate, data)
+               if err != nil {
+                       return nil, nil, err
+               }
+               templatePod, err = applyOverlayYAML(templatePod, bbuf.Bytes())
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed applying injection 
overlay: %v", err)
+               }
+               mergedPod, err = applyOverlayYAML(mergedPod, bbuf.Bytes())
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed parsing generated 
injected YAML (check Istio sidecar injector configuration): %v", err)
+               }
+       }
+
        return mergedPod, templatePod, nil
 }
+
+func knownTemplates(t Templates) []string {
+       keys := make([]string, 0, len(t))
+       for k := range t {
+               keys = append(keys, k)
+       }
+       return keys
+}
+
+func runTemplate(tmpl *template.Template, data ProxylessTemplateData) 
(bytes.Buffer, error) {
+       var res bytes.Buffer
+       if err := tmpl.Execute(&res, &data); err != nil {
+               klog.Errorf("Invalid template: %v", err)
+               return bytes.Buffer{}, err
+       }
+
+       return res, nil
+}
+
+func selectTemplates(params InjectionParameters) []string {
+       if a, f := params.pod.Annotations[annotation.InjectTemplates.Name]; f {
+               names := []string{}
+               for _, tmplName := range strings.Split(a, ",") {
+                       name := strings.TrimSpace(tmplName)
+                       names = append(names, name)
+               }
+               return resolveAliases(params, names)
+       }
+       return resolveAliases(params, params.defaultTemplate)
+}
+
+func resolveAliases(params InjectionParameters, names []string) []string {
+       ret := []string{}
+       for _, name := range names {
+               if al, f := params.aliases[name]; f {
+                       ret = append(ret, al...)
+               } else {
+                       ret = append(ret, name)
+               }
+       }
+       return ret
+}
+
+func stripPod(req InjectionParameters) *corev1.Pod {
+       pod := req.pod.DeepCopy()
+       prevStatus := injectionStatus(pod)
+       if prevStatus == nil {
+               return req.pod
+       }
+       // We found a previous status annotation. Possibly we are re-injecting 
the pod
+       // To ensure idempotency, remove our injected containers first
+       for _, c := range prevStatus.Containers {
+               pod.Spec.Containers = modifyContainers(pod.Spec.Containers, c, 
Remove)
+       }
+       for _, c := range prevStatus.InitContainers {
+               pod.Spec.InitContainers = 
modifyContainers(pod.Spec.InitContainers, c, Remove)
+       }
+
+       delete(pod.Annotations, annotation.SidecarStatus.Name)
+
+       return pod
+}
+
+type ContainerReorder int
+
+const (
+       MoveFirst ContainerReorder = iota
+       MoveLast
+       Remove
+)
+
+func modifyContainers(cl []corev1.Container, name string, modifier 
ContainerReorder) []corev1.Container {
+       containers := []corev1.Container{}
+       var match *corev1.Container
+       for _, c := range cl {
+               if c.Name != name {
+                       containers = append(containers, c)
+               } else {
+                       match = &c
+               }
+       }
+       if match == nil {
+               return containers
+       }
+       switch modifier {
+       case MoveFirst:
+               return append([]corev1.Container{*match}, containers...)
+       case MoveLast:
+               return append(containers, *match)
+       case Remove:
+               return containers
+       default:
+               return cl
+       }
+}
+
+func injectionStatus(pod *corev1.Pod) *ProxylessInjectionStatus {
+       var statusBytes []byte
+       if pod.ObjectMeta.Annotations != nil {
+               if value, ok := 
pod.ObjectMeta.Annotations[annotation.SidecarStatus.Name]; ok {
+                       statusBytes = []byte(value)
+               }
+       }
+       if statusBytes == nil {
+               return nil
+       }
+
+       // default case when injected pod has explicit status
+       var iStatus ProxylessInjectionStatus
+       if err := json.Unmarshal(statusBytes, &iStatus); err != nil {
+               return nil
+       }
+       return &iStatus
+}
+
+func reinsertOverrides(pod *corev1.Pod) (*corev1.Pod, error) {
+       type podOverrides struct {
+               Containers     []corev1.Container `json:"containers,omitempty"`
+               InitContainers []corev1.Container 
`json:"initContainers,omitempty"`
+       }
+
+       existingOverrides := podOverrides{}
+       if annotationOverrides, f := 
pod.Annotations[annotation.ProxyOverrides.Name]; f {
+               if err := json.Unmarshal([]byte(annotationOverrides), 
&existingOverrides); err != nil {
+                       return nil, err
+               }
+       }
+
+       pod = pod.DeepCopy()
+       for _, c := range existingOverrides.Containers {
+               match := FindContainer(c.Name, pod.Spec.Containers)
+               if match != nil {
+                       continue
+               }
+               pod.Spec.Containers = append(pod.Spec.Containers, c)
+       }
+
+       for _, c := range existingOverrides.InitContainers {
+               match := FindContainer(c.Name, pod.Spec.InitContainers)
+               if match != nil {
+                       continue
+               }
+               pod.Spec.InitContainers = append(pod.Spec.InitContainers, c)
+       }
+
+       return pod, nil
+}
+
+func FindContainer(name string, containers []corev1.Container) 
*corev1.Container {
+       for i := range containers {
+               if containers[i].Name == name {
+                       return &containers[i]
+               }
+       }
+       return nil
+}
+
+func ProxyImage(values *opconfig.Values, image *proxyConfig.ProxyImage, 
annotations map[string]string) string {
+       imageName := "proxyxds"
+       return imageName
+}
diff --git a/pkg/kube/inject/watcher.go b/pkg/kube/inject/watcher.go
index e7988ef5..972ea539 100644
--- a/pkg/kube/inject/watcher.go
+++ b/pkg/kube/inject/watcher.go
@@ -61,13 +61,13 @@ func (w *fileWatcher) Run(stop <-chan struct{}) {
                select {
                case <-timerC:
                        timerC = nil
-                       sidecarConfig, valuesConfig, err := w.Get()
+                       proxylessconfig, valuesConfig, err := w.Get()
                        if err != nil {
                                klog.Errorf("update error: %v", err)
                                break
                        }
                        if w.handler != nil {
-                               if err := w.handler(sidecarConfig, 
valuesConfig); err != nil {
+                               if err := w.handler(proxylessconfig, 
valuesConfig); err != nil {
                                        klog.Errorf("update error: %v", err)
                                }
                        }
@@ -108,12 +108,12 @@ func (w *configMapWatcher) Run(stop <-chan struct{}) {
 }
 
 func (w *configMapWatcher) Get() (*Config, string, error) {
-       cms := w.client.Kube().CoreV1().ConfigMaps(w.namespace)
-       cm, err := cms.Get(context.TODO(), w.name, metav1.GetOptions{})
+       configmaps := w.client.Kube().CoreV1().ConfigMaps(w.namespace)
+       configmap, err := configmaps.Get(context.TODO(), w.name, 
metav1.GetOptions{})
        if err != nil {
                return nil, "", err
        }
-       return readConfigMap(cm, w.configKey, w.valuesKey)
+       return readConfigMap(configmap, w.configKey, w.valuesKey)
 }
 
 func NewConfigMapWatcher(client kube.Client, namespace, name, configKey, 
valuesKey string) Watcher {
@@ -125,13 +125,13 @@ func NewConfigMapWatcher(client kube.Client, namespace, 
name, configKey, valuesK
                valuesKey: valuesKey,
        }
        w.c = configmapwatcher.NewController(client, namespace, name, func(cm 
*v1.ConfigMap) {
-               sidecarConfig, valuesConfig, err := readConfigMap(cm, 
configKey, valuesKey)
+               proxylessConfig, valuesConfig, err := readConfigMap(cm, 
configKey, valuesKey)
                if err != nil {
                        klog.Warningf("failed to read injection config from 
ConfigMap: %v", err)
                        return
                }
                if w.handler != nil {
-                       if err := w.handler(sidecarConfig, valuesConfig); err 
!= nil {
+                       if err := w.handler(proxylessConfig, valuesConfig); err 
!= nil {
                                klog.Errorf("update error: %v", err)
                        }
                }
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
index d8717c17..6f24befc 100644
--- a/pkg/kube/inject/webhook.go
+++ b/pkg/kube/inject/webhook.go
@@ -17,6 +17,8 @@ import (
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/serializer"
        "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/mergepatch"
+       "k8s.io/apimachinery/pkg/util/strategicpatch"
        "k8s.io/klog/v2"
        "net/http"
        "os"
@@ -254,11 +256,6 @@ func injectPod(req InjectionParameters) ([]byte, error) {
                return nil, fmt.Errorf("failed to re apply container: %v", err)
        }
 
-       // Apply some additional transformations to the pod
-       if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
-               return nil, fmt.Errorf("failed to process pod: %v", err)
-       }
-
        patch, err := createPatch(mergedPod, originalPodSpec)
        if err != nil {
                return nil, fmt.Errorf("failed to create patch: %v", err)
@@ -267,7 +264,9 @@ func injectPod(req InjectionParameters) ([]byte, error) {
        return patch, nil
 }
 
-func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod 
*corev1.Pod, templatePod *corev1.Pod, proxyConfig *meshconfig.ProxyConfig) 
(*corev1.Pod, error) {
+func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod 
*corev1.Pod, templatePod *corev1.Pod,
+       proxyConfig *meshconfig.ProxyConfig,
+) (*corev1.Pod, error) {
        return finalPod, nil
 }
 
@@ -283,15 +282,61 @@ func createPatch(pod *corev1.Pod, original []byte) 
([]byte, error) {
        return json.Marshal(p)
 }
 
-func postProcessPod(pod *corev1.Pod, injectedPod corev1.Pod, req 
InjectionParameters) error {
-       if pod.Annotations == nil {
-               pod.Annotations = map[string]string{}
+func applyOverlayYAML(target *corev1.Pod, overlayYAML []byte) (*corev1.Pod, 
error) {
+       currentJSON, err := json.Marshal(target)
+       if err != nil {
+               return nil, err
        }
-       if pod.Labels == nil {
-               pod.Labels = map[string]string{}
+
+       pod := corev1.Pod{}
+       // Overlay the injected template onto the original podSpec
+       patched, err := StrategicMergePatchYAML(currentJSON, overlayYAML, pod)
+       if err != nil {
+               return nil, fmt.Errorf("strategic merge: %v", err)
        }
 
-       return nil
+       if err := json.Unmarshal(patched, &pod); err != nil {
+               return nil, fmt.Errorf("unmarshal patched pod: %v", err)
+       }
+       return &pod, nil
+}
+
+func StrategicMergePatchYAML(originalJSON []byte, patchYAML []byte, dataStruct 
any) ([]byte, error) {
+       schema, err := strategicpatch.NewPatchMetaFromStruct(dataStruct)
+       if err != nil {
+               return nil, err
+       }
+
+       originalMap, err := patchHandleUnmarshal(originalJSON, json.Unmarshal)
+       if err != nil {
+               return nil, err
+       }
+       patchMap, err := patchHandleUnmarshal(patchYAML, func(data []byte, v 
any) error {
+               return yaml.Unmarshal(data, v)
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       result, err := 
strategicpatch.StrategicMergeMapPatchUsingLookupPatchMeta(originalMap, 
patchMap, schema)
+       if err != nil {
+               return nil, err
+       }
+
+       return json.Marshal(result)
+}
+
+func patchHandleUnmarshal(j []byte, unmarshal func(data []byte, v any) error) 
(map[string]any, error) {
+       if j == nil {
+               j = []byte("{}")
+       }
+
+       m := map[string]any{}
+       err := unmarshal(j, &m)
+       if err != nil {
+               return nil, mergepatch.ErrBadJSONDoc
+       }
+       return m, nil
 }
 
 func parseInjectEnvs(path string) map[string]string {
@@ -353,8 +398,8 @@ func (wh *Webhook) inject(ar *kube.AdmissionReview, path 
string) *kube.Admission
        if pod.ObjectMeta.Namespace == "" {
                pod.ObjectMeta.Namespace = req.Namespace
        }
-       klog.Infof("Namespace: %v podName: %s", pod.Namespace+"/"+podName)
-       klog.Infof("Process proxyless injection request")
+       klog.Info(pod.Namespace + "/" + podName)
+       klog.Infof("path=%s Process proxyless injection request", path)
 
        wh.mu.RLock()
        proxyConfig := wh.env.GetProxyConfigOrDefault(pod.Namespace, 
pod.Labels, pod.Annotations, wh.meshConfig)
diff --git a/pkg/kube/util.go b/pkg/kube/util.go
index 10309ed3..36ba6664 100644
--- a/pkg/kube/util.go
+++ b/pkg/kube/util.go
@@ -101,6 +101,41 @@ func SetRestDefaults(config *rest.Config) *rest.Config {
        return config
 }
 
+func StripPodUnusedFields(obj any) (any, error) {
+       t, ok := obj.(metav1.ObjectMetaAccessor)
+       if !ok {
+               // shouldn't happen
+               return obj, nil
+       }
+       // ManagedFields is large and we never use it
+       t.GetObjectMeta().SetManagedFields(nil)
+       // only container ports can be used
+       if pod := obj.(*corev1.Pod); pod != nil {
+               containers := []corev1.Container{}
+               for _, c := range pod.Spec.Containers {
+                       if len(c.Ports) > 0 {
+                               containers = append(containers, 
corev1.Container{
+                                       Ports: c.Ports,
+                               })
+                       }
+               }
+               oldSpec := pod.Spec
+               newSpec := corev1.PodSpec{
+                       Containers:         containers,
+                       ServiceAccountName: oldSpec.ServiceAccountName,
+                       NodeName:           oldSpec.NodeName,
+                       HostNetwork:        oldSpec.HostNetwork,
+                       Hostname:           oldSpec.Hostname,
+                       Subdomain:          oldSpec.Subdomain,
+               }
+               pod.Spec = newSpec
+               pod.Status.InitContainerStatuses = nil
+               pod.Status.ContainerStatuses = nil
+       }
+
+       return obj, nil
+}
+
 const MaxRequestBodyBytes = int64(6 * 1024 * 1024)
 
 func HTTPConfigReader(req *http.Request) ([]byte, error) {
diff --git a/pkg/webhooks/validation/controller/controller.go 
b/pkg/webhooks/validation/controller/controller.go
index 169f2582..bba26ab8 100644
--- a/pkg/webhooks/validation/controller/controller.go
+++ b/pkg/webhooks/validation/controller/controller.go
@@ -9,7 +9,6 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/ptr"
        "github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
        "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
-       "istio.io/api/label"
        kubeApiAdmission "k8s.io/api/admissionregistration/v1"
        klabels "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/types"
@@ -152,7 +151,7 @@ func newController(o Options, client kube.Client) 
*Controller {
                
controllers.WithRateLimiter(workqueue.NewTypedItemExponentialFailureRateLimiter[any](100*time.Millisecond,
 1*time.Minute)))
 
        c.webhooks = 
kclient.NewFiltered[*kubeApiAdmission.ValidatingWebhookConfiguration](client, 
kclient.Filter{
-               LabelSelector: fmt.Sprintf("%s=%s", label.IoIstioRev.Name, 
o.Revision),
+               LabelSelector: fmt.Sprintf("%s=%s", "dubbo.io/rev", o.Revision),
        })
        c.webhooks.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
 
diff --git a/pkg/webhooks/webhookpatch.go b/pkg/webhooks/webhookpatch.go
index f8b507ac..5eaea60a 100644
--- a/pkg/webhooks/webhookpatch.go
+++ b/pkg/webhooks/webhookpatch.go
@@ -8,7 +8,6 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
        "github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
        "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
-       "istio.io/api/label"
        v1 "k8s.io/api/admissionregistration/v1"
        kerrors "k8s.io/apimachinery/pkg/api/errors"
        klabels "k8s.io/apimachinery/pkg/labels"
@@ -83,7 +82,7 @@ func (w *WebhookCertPatcher) 
patchMutatingWebhookConfig(webhookConfigName string
                return errNotFound
        }
        // prevents a race condition between multiple istiods when the revision 
is changed or modified
-       v, ok := config.Labels[label.IoIstioRev.Name]
+       v, ok := config.Labels["dubbo.io/rev"]
        if !ok {
                return nil
        }
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 6207b934..97c1085e 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -47,7 +47,6 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/server"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
-       
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
        tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
        "github.com/apache/dubbo-kubernetes/sail/pkg/xds"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
@@ -99,7 +98,6 @@ type Server struct {
        ConfigStores           []model.ConfigStoreController
        configController       model.ConfigStoreController
        multiclusterController *multicluster.Controller
-       serviceEntryController *serviceentry.Controller
 
        fileWatcher         filewatcher.FileWatcher
        internalStop        chan struct{}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 5d0cd60f..eb147798 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -19,6 +19,7 @@ package model
 
 import (
        "cmp"
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
@@ -49,6 +50,8 @@ const (
        ProxyRequest           TriggerReason = "proxyrequest"
        GlobalUpdate           TriggerReason = "global"
        HeadlessEndpointUpdate TriggerReason = "headlessendpoint"
+       EndpointUpdate         TriggerReason = "endpoint"
+       ProxyUpdate            TriggerReason = "proxy"
 )
 
 type ProxyPushStatus struct {
@@ -148,7 +151,10 @@ func (pr *PushRequest) CopyMerge(other *PushRequest) 
*PushRequest {
 
 type XDSUpdater interface {
        ConfigUpdate(req *PushRequest)
-       SvcUpdate(shard ShardKey, hostname string, namespace string, event 
Event)
+       ServiceUpdate(shard ShardKey, hostname string, namespace string, event 
Event)
+       EDSUpdate(shard ShardKey, hostname string, namespace string, entry 
[]*DubboEndpoint)
+       EDSCacheUpdate(shard ShardKey, hostname string, namespace string, entry 
[]*DubboEndpoint)
+       ProxyUpdate(clusterID cluster.ID, ip string)
 }
 
 func (ps *PushContext) InitContext(env *Environment, oldPushContext 
*PushContext, pushReq *PushRequest) {
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 0c69d941..95985f8b 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -7,6 +7,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/config/protocol"
        "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
        "github.com/apache/dubbo-kubernetes/pkg/maps"
+       "github.com/apache/dubbo-kubernetes/pkg/network"
        "github.com/apache/dubbo-kubernetes/pkg/slices"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
@@ -58,29 +59,30 @@ func (p *endpointDiscoverabilityPolicyImpl) CmpOpts() 
[]cmp.Option {
 type HealthStatus int32
 
 const (
-       // Healthy indicates an endpoint is ready to accept traffic
-       Healthy HealthStatus = 1
-       // UnHealthy indicates an endpoint is not ready to accept traffic
-       UnHealthy HealthStatus = 2
-       // Draining is a special case, which is used only when persistent 
sessions are enabled. This indicates an endpoint
-       // was previously healthy, but is now shutting down.
-       // Without persistent sessions, an endpoint that is shutting down will 
be marked as Terminating.
-       Draining HealthStatus = 3
-       // Terminating marks an endpoint as shutting down. Similar to 
"unhealthy", this means we should not send it traffic.
-       // But unlike "unhealthy", this means we do not consider it when 
calculating failover.
+       Healthy     HealthStatus = 1
+       UnHealthy   HealthStatus = 2
+       Draining    HealthStatus = 3
        Terminating HealthStatus = 4
 )
 
 type DubboEndpoint struct {
        ServiceAccount         string
        Addresses              []string
-       WorkloadName           string
        ServicePortName        string
        Labels                 labels.Instance
        HealthStatus           HealthStatus
        SendUnhealthyEndpoints bool
        DiscoverabilityPolicy  EndpointDiscoverabilityPolicy `json:"-"`
        LegacyClusterPortKey   int
+       EndpointPort           uint32
+       WorkloadName           string
+       Network                network.ID
+       Namespace              string
+       // Specifies the hostname of the Pod, empty for vm workload.
+       HostName string
+       // If specified, the fully qualified Pod hostname will be 
"<hostname>.<subdomain>.<pod namespace>.svc.<cluster domain>".
+       SubDomain string
+       NodeName  string
 }
 
 func (ep *DubboEndpoint) FirstAddressOrNil() string {
@@ -413,3 +415,7 @@ func (s *ServiceAttributes) Equals(other 
*ServiceAttributes) bool {
        return s.Name == other.Name && s.Namespace == other.Namespace &&
                s.ServiceRegistry == other.ServiceRegistry && s.K8sAttributes 
== other.K8sAttributes
 }
+
+func (s *Service) SupportsUnhealthyEndpoints() bool {
+       return false
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go 
b/sail/pkg/serviceregistry/kube/controller/controller.go
index 839ad194..558cfcb3 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -20,23 +20,31 @@ package controller
 import (
        "github.com/apache/dubbo-kubernetes/pkg/cluster"
        "github.com/apache/dubbo-kubernetes/pkg/config/host"
+       "github.com/apache/dubbo-kubernetes/pkg/config/labels"
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
        kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
        "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
        "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+       "github.com/apache/dubbo-kubernetes/pkg/network"
        "github.com/apache/dubbo-kubernetes/pkg/ptr"
        "github.com/apache/dubbo-kubernetes/pkg/queue"
        "github.com/apache/dubbo-kubernetes/pkg/slices"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+       "github.com/hashicorp/go-multierror"
        "go.uber.org/atomic"
+       "istio.io/api/label"
        v1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       klabels "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/klog/v2"
        "sort"
@@ -44,7 +52,12 @@ import (
        "time"
 )
 
+type controllerInterface interface {
+       Network(endpointIP string, labels labels.Instance) network.ID
+}
+
 var (
+       _ controllerInterface      = &Controller{}
        _ serviceregistry.Instance = &Controller{}
 )
 
@@ -58,8 +71,13 @@ type Controller struct {
        configCluster       bool
        services            kclient.Client[*v1.Service]
        endpoints           *endpointSliceController
-       meshWatcher         mesh.Watcher
-       handlers            model.ControllerHandlers
+       podsClient          kclient.Client[*v1.Pod]
+       namespaces          kclient.Client[*v1.Namespace]
+
+       meshWatcher mesh.Watcher
+       handlers    model.ControllerHandlers
+       pods        *PodCache
+       *networkManager
 }
 
 func NewController(kubeClient kubelib.Client, options Options) *Controller {
@@ -72,13 +90,75 @@ func NewController(kubeClient kubelib.Client, options 
Options) *Controller {
 
                configCluster: options.ConfigCluster,
        }
+       c.networkManager = initNetworkManager(c, options)
+
+       c.namespaces = kclient.New[*v1.Namespace](kubeClient)
+       if c.opts.SystemNamespace != "" {
+               registerHandlers[*v1.Namespace](
+                       c,
+                       c.namespaces,
+                       "Namespaces",
+                       func(old *v1.Namespace, cur *v1.Namespace, event 
model.Event) error {
+                               if cur.Name == c.opts.SystemNamespace {
+                                       return c.onSystemNamespaceEvent(old, 
cur, event)
+                               }
+                               return nil
+                       },
+                       nil,
+               )
+       }
+
        c.services = kclient.NewFiltered[*v1.Service](kubeClient, 
kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
        registerHandlers(c, c.services, "Services", c.onServiceEvent, nil)
        c.endpoints = newEndpointSliceController(c)
+
+       c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{
+               ObjectFilter:    kubeClient.ObjectFilter(),
+               ObjectTransform: kubelib.StripPodUnusedFields,
+       })
+       c.pods = newPodCache(c, c.podsClient, func(key types.NamespacedName) {
+               c.queue.Push(func() error {
+                       return c.endpoints.podArrived(key.Name, key.Namespace)
+               })
+       })
+       registerHandlers[*v1.Pod](c, c.podsClient, "Pods", c.pods.onEvent, nil)
        c.meshWatcher = options.MeshWatcher
        return c
 }
 
+func (c *Controller) onSystemNamespaceEvent(_, ns *v1.Namespace, ev 
model.Event) error {
+       if ev == model.EventDelete {
+               return nil
+       }
+       if c.setNetworkFromNamespace(ns) {
+               // network changed, rarely happen
+               // refresh pods/endpoints/services
+               c.onNetworkChange()
+       }
+       return nil
+}
+
+func (c *Controller) onNetworkChange() {
+       // the network for endpoints are computed when we process the events; 
this will fix the cache
+       // NOTE: this must run before the other network watcher handler that 
creates a force push
+       if err := c.syncPods(); err != nil {
+               klog.Errorf("one or more errors force-syncing pods: %v", err)
+       }
+       if err := c.endpoints.initializeNamespace(metav1.NamespaceAll, true); 
err != nil {
+               klog.Errorf("one or more errors force-syncing endpoints: %v", 
err)
+       }
+
+}
+
+func (c *Controller) syncPods() error {
+       var err *multierror.Error
+       pods := c.podsClient.List(metav1.NamespaceAll, klabels.Everything())
+       for _, s := range pods {
+               err = multierror.Append(err, c.pods.onEvent(nil, s, 
model.EventAdd))
+       }
+       return err.ErrorOrNil()
+}
+
 type Options struct {
        KubernetesAPIQPS      float32
        KubernetesAPIBurst    int
@@ -138,7 +218,7 @@ func (c *Controller) deleteService(svc *model.Service) {
 
        shard := model.ShardKeyFromRegistry(c)
        event := model.EventDelete
-       c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname), 
svc.Attributes.Namespace, event)
+       c.opts.XDSUpdater.ServiceUpdate(shard, string(svc.Hostname), 
svc.Attributes.Namespace, event)
        if !svc.Attributes.ExportTo.Contains(visibility.None) {
                c.handlers.NotifyServiceHandlers(nil, svc, event)
        }
@@ -155,13 +235,50 @@ func (c *Controller) addOrUpdateService(pre, curr 
*v1.Service, currConv *model.S
        shard := model.ShardKeyFromRegistry(c)
        ns := currConv.Attributes.Namespace
 
-       c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
+       c.opts.XDSUpdater.ServiceUpdate(shard, string(currConv.Hostname), ns, 
event)
        if serviceUpdateNeedsPush(pre, curr, prevConv, currConv) {
                klog.V(2).Infof("Service %s in namespace %s updated and needs 
push", currConv.Hostname, ns)
                c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
        }
 }
 
+func (c *Controller) recomputeServiceForPod(pod *v1.Pod) {
+       allServices := c.services.List(pod.Namespace, klabels.Everything())
+       cu := sets.New[model.ConfigKey]()
+       services := getPodServices(allServices, pod)
+       for _, svc := range services {
+               hostname := kube.ServiceHostname(svc.Name, svc.Namespace, 
c.opts.DomainSuffix)
+               c.Lock()
+               conv, f := c.servicesMap[hostname]
+               c.Unlock()
+               if !f {
+                       return
+               }
+               shard := model.ShardKeyFromRegistry(c)
+               endpoints := c.buildEndpointsForService(conv, true)
+               if len(endpoints) > 0 {
+                       c.opts.XDSUpdater.EDSCacheUpdate(shard, 
string(hostname), svc.Namespace, endpoints)
+               }
+               cu.Insert(model.ConfigKey{
+                       Kind:      kind.ServiceEntry,
+                       Name:      string(hostname),
+                       Namespace: svc.Namespace,
+               })
+       }
+       if len(cu) > 0 {
+               c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
+                       Full:           false,
+                       ConfigsUpdated: cu,
+                       Reason:         
model.NewReasonStats(model.EndpointUpdate),
+               })
+       }
+}
+
+func (c *Controller) buildEndpointsForService(svc *model.Service, updateCache 
bool) []*model.DubboEndpoint {
+       endpoints := 
c.endpoints.buildDubboEndpointsWithService(svc.Attributes.Name, 
svc.Attributes.Namespace, svc.Hostname, updateCache)
+       return endpoints
+}
+
 func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv 
*model.Service) bool {
        // New Service - If it is not exported, no need to push.
        if preConv == nil {
@@ -223,7 +340,17 @@ func (c *Controller) HasSynced() bool {
 }
 
 func (c *Controller) informersSynced() bool {
-       return true
+       return c.namespaces.HasSynced() &&
+               c.pods.pods.HasSynced() &&
+               c.services.HasSynced() &&
+               c.endpoints.slices.HasSynced() &&
+               c.networkManager.HasSynced()
+}
+
+func (c *Controller) hostNamesForNamespacedName(name types.NamespacedName) 
[]host.Name {
+       return []host.Name{
+               kube.ServiceHostname(name.Name, name.Namespace, 
c.opts.DomainSuffix),
+       }
 }
 
 type FilterOutFunc[T controllers.Object] func(old, cur T) bool
@@ -273,3 +400,20 @@ func (c *Controller) servicesForNamespacedName(name 
types.NamespacedName) []*mod
        }
        return nil
 }
+
+func (c *Controller) Network(endpointIP string, labels labels.Instance) 
network.ID {
+       // 1. check the pod/workloadEntry label
+       if nw := labels[label.TopologyNetwork.Name]; nw != "" {
+               return network.ID(nw)
+       }
+       // 2. check the system namespace labels
+       if nw := c.networkFromSystemNamespace(); nw != "" {
+               return nw
+       }
+
+       // 3. check the meshNetworks config
+       if nw := c.networkFromMeshNetworks(endpointIP); nw != "" {
+               return nw
+       }
+       return ""
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/endpoint_builder.go 
b/sail/pkg/serviceregistry/kube/controller/endpoint_builder.go
new file mode 100644
index 00000000..83b32eb0
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/endpoint_builder.go
@@ -0,0 +1,97 @@
+package controller
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+       "github.com/apache/dubbo-kubernetes/pkg/network"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube"
+       labelutil 
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/util/label"
+       v1 "k8s.io/api/core/v1"
+)
+
+type EndpointBuilder struct {
+       controller     controllerInterface
+       labels         labels.Instance
+       metaNetwork    network.ID
+       serviceAccount string
+       workloadName   string
+       namespace      string
+       hostname       string
+       subDomain      string
+       nodeName       string
+}
+
+func (c *Controller) NewEndpointBuilder(pod *v1.Pod) *EndpointBuilder {
+       var sa, namespace, hostname, subdomain, ip, node string
+       var podLabels labels.Instance
+       if pod != nil {
+               sa = kube.SecureNamingSAN(pod, c.meshWatcher.Mesh())
+               podLabels = pod.Labels
+               namespace = pod.Namespace
+               subdomain = pod.Spec.Subdomain
+               if subdomain != "" {
+                       hostname = pod.Spec.Hostname
+                       if hostname == "" {
+                               hostname = pod.Name
+                       }
+               }
+               node = pod.Spec.NodeName
+       }
+       out := &EndpointBuilder{
+               controller:     c,
+               serviceAccount: sa,
+               namespace:      namespace,
+               hostname:       hostname,
+               subDomain:      subdomain,
+               labels:         podLabels,
+               nodeName:       node,
+       }
+       networkID := out.endpointNetwork(ip)
+       out.labels = labelutil.AugmentLabels(podLabels, c.Cluster(), "", node, 
networkID)
+       return out
+}
+
+func (b *EndpointBuilder) buildDubboEndpoint(
+       endpointAddress string,
+       endpointPort int32,
+       svcPortName string,
+       discoverabilityPolicy model.EndpointDiscoverabilityPolicy,
+       healthStatus model.HealthStatus,
+       sendUnhealthy bool,
+) *model.DubboEndpoint {
+       if b == nil {
+               return nil
+       }
+
+       // in case pod is not found when init EndpointBuilder.
+       networkID := network.ID(b.labels["topology.dubbo.io/network"])
+       if networkID == "" {
+               networkID = b.endpointNetwork(endpointAddress)
+               b.labels["topology.dubbo.io/network"] = string(networkID)
+       }
+
+       return &model.DubboEndpoint{
+               Labels:                 b.labels,
+               ServiceAccount:         b.serviceAccount,
+               Addresses:              []string{endpointAddress},
+               EndpointPort:           uint32(endpointPort),
+               ServicePortName:        svcPortName,
+               Network:                networkID,
+               Namespace:              b.namespace,
+               HostName:               b.hostname,
+               SubDomain:              b.subDomain,
+               DiscoverabilityPolicy:  discoverabilityPolicy,
+               HealthStatus:           healthStatus,
+               SendUnhealthyEndpoints: sendUnhealthy,
+               NodeName:               b.nodeName,
+       }
+}
+
+func (b *EndpointBuilder) endpointNetwork(endpointIP string) network.ID {
+       // If we're building the endpoint based on proxy meta, prefer the 
injected ISTIO_META_NETWORK value.
+       if b.metaNetwork != "" {
+               return b.metaNetwork
+       }
+
+       return b.controller.Network(endpointIP, b.labels)
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go 
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
index 3e240cc9..0af5fa38 100644
--- a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -2,18 +2,23 @@ package controller
 
 import (
        "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/host"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
        "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/hashicorp/go-multierror"
        "istio.io/api/annotation"
        corev1 "k8s.io/api/core/v1"
        v1 "k8s.io/api/discovery/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        klabels "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/selection"
+       "k8s.io/apimachinery/pkg/types"
        mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
        "strings"
+       "sync"
 )
 
 var (
@@ -22,15 +27,29 @@ var (
 )
 
 type endpointSliceController struct {
-       slices kclient.Client[*v1.EndpointSlice]
-       c      *Controller
+       endpointCache *endpointSliceCache
+       slices        kclient.Client[*v1.EndpointSlice]
+       c             *Controller
+}
+
+type endpointSliceCache struct {
+       mu                         sync.RWMutex
+       endpointsByServiceAndSlice 
map[host.Name]map[string][]*model.DubboEndpoint
+}
+
+func newEndpointSliceCache() *endpointSliceCache {
+       out := &endpointSliceCache{
+               endpointsByServiceAndSlice: 
make(map[host.Name]map[string][]*model.DubboEndpoint),
+       }
+       return out
 }
 
 func newEndpointSliceController(c *Controller) *endpointSliceController {
        slices := kclient.NewFiltered[*v1.EndpointSlice](c.client, 
kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
        out := &endpointSliceController{
-               c:      c,
-               slices: slices,
+               c:             c,
+               slices:        slices,
+               endpointCache: newEndpointSliceCache(),
        }
        registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice", 
out.onEvent, nil)
        return out
@@ -46,15 +65,11 @@ func (esc *endpointSliceController) onEventInternal(_, ep 
*v1.EndpointSlice, eve
        if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
                return
        }
-       // Update internal endpoint cache no matter what kind of service, even 
headless service.
-       // As for gateways, the cluster discovery type is `EDS` for headless 
service.
-       // namespacedName := getServiceNamespacedName(ep)
-       // log.Debugf("Handle EDS endpoint %s %s in namespace %s", 
namespacedName.Name, event, namespacedName.Namespace)
-       // if event == model.EventDelete {
-       //      esc.deleteEndpointSlice(ep)
-       // } else {
-       //      esc.updateEndpointSlice(ep)
-       // }
+       if event == model.EventDelete {
+               esc.deleteEndpointSlice(ep)
+       } else {
+               esc.updateEndpointSlice(ep)
+       }
 
        // Now check if we need to do a full push for the service.
        // If the service is headless, we need to do a full push if service 
exposes TCP ports
@@ -66,10 +81,9 @@ func (esc *endpointSliceController) onEventInternal(_, ep 
*v1.EndpointSlice, eve
                return
        }
 
-       // hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
-       // log.Debugf("triggering EDS push for %s in namespace %s", hostnames, 
namespacedName.Namespace)
-       // Trigger EDS push for all hostnames.
-       // esc.pushEDS(hostnames, namespacedName.Namespace)
+       namespacedName := getServiceNamespacedName(ep)
+       hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
+       esc.pushEDS(hostnames, namespacedName.Namespace)
 
        if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone || 
svc.Spec.Type == corev1.ServiceTypeExternalName {
                return
@@ -85,8 +99,6 @@ func (esc *endpointSliceController) onEventInternal(_, ep 
*v1.EndpointSlice, eve
                        }
                }
                if supportsOnlyHTTP {
-                       // pure HTTP headless services should not need a full 
push since they do not
-                       // require a Listener based on IP: 
https://github.com/istio/istio/issues/48207
                        configsUpdated.Insert(model.ConfigKey{Kind: 
kind.DNSName, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
                } else {
                        configsUpdated.Insert(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
@@ -102,10 +114,186 @@ func (esc *endpointSliceController) onEventInternal(_, 
ep *v1.EndpointSlice, eve
        }
 }
 
+func (esc *endpointSliceController) deleteEndpointSlice(slice 
*v1.EndpointSlice) {
+       key := config.NamespacedName(slice)
+       for _, e := range slice.Endpoints {
+               for _, a := range e.Addresses {
+                       esc.c.pods.endpointDeleted(key, a)
+               }
+       }
+
+       esc.endpointCache.mu.Lock()
+       defer esc.endpointCache.mu.Unlock()
+       for _, hostName := range 
esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
+               // endpointSlice cache update
+               if esc.endpointCache.has(hostName) {
+                       esc.endpointCache.delete(hostName, slice.Name)
+               }
+       }
+}
+
+func (e *endpointSliceCache) has(hostname host.Name) bool {
+       _, found := e.endpointsByServiceAndSlice[hostname]
+       return found
+}
+
+func (e *endpointSliceCache) delete(hostname host.Name, slice string) {
+       delete(e.endpointsByServiceAndSlice[hostname], slice)
+       if len(e.endpointsByServiceAndSlice[hostname]) == 0 {
+               delete(e.endpointsByServiceAndSlice, hostname)
+       }
+}
+
+func (esc *endpointSliceController) updateEndpointSlice(slice 
*v1.EndpointSlice) {
+       for _, hostname := range 
esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
+               esc.updateEndpointCacheForSlice(hostname, slice)
+       }
+}
+
+func (esc *endpointSliceController) initializeNamespace(ns string, filtered 
bool) error {
+       var err *multierror.Error
+       var endpoints []*v1.EndpointSlice
+       if filtered {
+               endpoints = esc.slices.List(ns, klabels.Everything())
+       } else {
+               endpoints = esc.slices.ListUnfiltered(ns, klabels.Everything())
+       }
+       for _, s := range endpoints {
+               err = multierror.Append(err, esc.onEvent(nil, s, 
model.EventAdd))
+       }
+       return err.ErrorOrNil()
+}
+
+func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName 
host.Name, epSlice *v1.EndpointSlice) {
+       var endpoints []*model.DubboEndpoint
+       if epSlice.AddressType == v1.AddressTypeFQDN {
+               return
+       }
+       svc := esc.c.GetService(hostName)
+       svcNamespacedName := getServiceNamespacedName(epSlice)
+       // This is not a endpointslice for service, ignore
+       if svcNamespacedName.Name == "" {
+               return
+       }
+
+       for _, e := range epSlice.Endpoints {
+               // Draining tracking is only enabled if persistent sessions is 
enabled.
+               // If we start using them for other features, this can be 
adjusted.
+               healthStatus := endpointHealthStatus(svc, e)
+               for _, a := range e.Addresses {
+                       pod, expectedPod := getPod(esc.c, a, 
&metav1.ObjectMeta{Name: epSlice.Name, Namespace: epSlice.Namespace}, 
e.TargetRef, hostName)
+                       if pod == nil && expectedPod {
+                               continue
+                       }
+
+                       var overrideAddresses []string
+                       builder := esc.c.NewEndpointBuilder(pod)
+                       // EDS and ServiceEntry use name for service port - ADS 
will need to map to numbers.
+                       for _, port := range epSlice.Ports {
+                               var portNum int32
+                               if port.Port != nil {
+                                       portNum = *port.Port
+                               }
+                               var portName string
+                               if port.Name != nil {
+                                       portName = *port.Name
+                               }
+
+                               dubboEndpoint := builder.buildDubboEndpoint(a, 
portNum, portName, nil, healthStatus, svc.SupportsUnhealthyEndpoints())
+                               if len(overrideAddresses) > 1 {
+                                       dubboEndpoint.Addresses = 
overrideAddresses
+                               }
+                               endpoints = append(endpoints, dubboEndpoint)
+                       }
+               }
+       }
+       esc.endpointCache.Update(hostName, epSlice.Name, endpoints)
+}
+
+func (e *endpointSliceCache) Update(hostname host.Name, slice string, 
endpoints []*model.DubboEndpoint) {
+       e.mu.Lock()
+       defer e.mu.Unlock()
+       e.update(hostname, slice, endpoints)
+}
+
+func (e *endpointSliceCache) update(hostname host.Name, slice string, 
endpoints []*model.DubboEndpoint) {
+       if len(endpoints) == 0 {
+               delete(e.endpointsByServiceAndSlice[hostname], slice)
+       }
+       if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
+               e.endpointsByServiceAndSlice[hostname] = 
make(map[string][]*model.DubboEndpoint)
+       }
+       // We will always overwrite. A conflict here means an endpoint is 
transitioning
+       // from one slice to another See
+       // 
https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints
+       // In this case, we can always assume and update is fresh, although 
older slices
+       // we have not gotten updates may be stale; therefore we always take 
the new
+       // update.
+       e.endpointsByServiceAndSlice[hostname][slice] = endpoints
+}
+
+func endpointHealthStatus(svc *model.Service, e v1.Endpoint) 
model.HealthStatus {
+       if e.Conditions.Ready == nil || *e.Conditions.Ready {
+               return model.Healthy
+       }
+
+       // If it is shutting down, mark it as terminating. This occurs 
regardless of whether it was previously healthy or not.
+       if svc != nil &&
+               (e.Conditions.Terminating == nil || *e.Conditions.Terminating) {
+               return model.Terminating
+       }
+
+       return model.UnHealthy
+}
+
 func serviceNameForEndpointSlice(labels map[string]string) string {
        return labels[v1.LabelServiceName]
 }
 
+func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef 
*corev1.ObjectReference, host host.Name) (*corev1.Pod, bool) {
+       var expectPod bool
+       pod := c.getPod(ip, ep.Namespace, targetRef)
+       if targetRef != nil && targetRef.Kind == kind.Pod.String() {
+               expectPod = true
+               if pod == nil {
+                       c.registerEndpointResync(ep, ip, host)
+               }
+       }
+
+       return pod, expectPod
+}
+
+func (c *Controller) registerEndpointResync(ep *metav1.ObjectMeta, ip string, 
host host.Name) {
+       // Tell pod cache we want to queue the endpoint event when this pod 
arrives.
+       c.pods.queueEndpointEventOnPodArrival(config.NamespacedName(ep), ip)
+}
+
+func (c *Controller) getPod(ip string, namespace string, targetRef 
*corev1.ObjectReference) *corev1.Pod {
+       if targetRef != nil && targetRef.Kind == kind.Pod.String() {
+               key := types.NamespacedName{Name: targetRef.Name, Namespace: 
targetRef.Namespace}
+               pod := c.pods.getPodByKey(key)
+               return pod
+       }
+       // This means the endpoint is manually controlled
+       // We will want to lookup a pod to find metadata like service account, 
labels, etc. But for hostNetwork, we just get a raw IP,
+       // and the IP may be shared by many pods. Best we can do is guess.
+       pods := c.pods.getPodsByIP(ip)
+       for _, p := range pods {
+               if p.Namespace == namespace {
+                       // Might not be right, but best we can do.
+                       return p
+               }
+       }
+       return nil
+}
+
+func getServiceNamespacedName(slice *v1.EndpointSlice) types.NamespacedName {
+       return types.NamespacedName{
+               Namespace: slice.GetNamespace(),
+               Name:      serviceNameForEndpointSlice(slice.GetLabels()),
+       }
+}
+
 func serviceNeedsPush(svc *corev1.Service) bool {
        if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
                namespaces := 
strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
@@ -118,3 +306,73 @@ func serviceNeedsPush(svc *corev1.Service) bool {
        }
        return true
 }
+
+func (esc *endpointSliceController) pushEDS(hostnames []host.Name, namespace 
string) {
+       shard := model.ShardKeyFromRegistry(esc.c)
+       esc.endpointCache.mu.Lock()
+       defer esc.endpointCache.mu.Unlock()
+
+       for _, hostname := range hostnames {
+               endpoints := esc.endpointCache.get(hostname)
+               esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname), 
namespace, endpoints)
+       }
+}
+
+type endpointKey struct {
+       ip   string
+       port string
+}
+
+func (e *endpointSliceCache) get(hostname host.Name) []*model.DubboEndpoint {
+       var endpoints []*model.DubboEndpoint
+       found := sets.New[endpointKey]()
+       for _, eps := range e.endpointsByServiceAndSlice[hostname] {
+               for _, ep := range eps {
+                       key := endpointKey{ep.FirstAddressOrNil(), 
ep.ServicePortName}
+                       if found.InsertContains(key) {
+                               // This a duplicate. Update() already handles 
conflict resolution, so we don't
+                               // need to pick the "right" one here.
+                               continue
+                       }
+                       endpoints = append(endpoints, ep)
+               }
+       }
+       return endpoints
+}
+
+func (esc *endpointSliceController) podArrived(name, ns string) error {
+       ep := esc.slices.Get(name, ns)
+       if ep == nil {
+               return nil
+       }
+       return esc.onEvent(nil, ep, model.EventAdd)
+}
+
+func (esc *endpointSliceController) buildDubboEndpointsWithService(name, 
namespace string, hostName host.Name, updateCache bool) []*model.DubboEndpoint {
+       esLabelSelector := endpointSliceSelectorForService(name)
+       slices := esc.slices.List(namespace, esLabelSelector)
+       if len(slices) == 0 {
+               return nil
+       }
+
+       if updateCache {
+               // A cache update was requested. Rebuild the endpoints for 
these slices.
+               for _, slice := range slices {
+                       esc.updateEndpointCacheForSlice(hostName, slice)
+               }
+       }
+
+       return esc.endpointCache.Get(hostName)
+}
+
+func (e *endpointSliceCache) Get(hostname host.Name) []*model.DubboEndpoint {
+       e.mu.RLock()
+       defer e.mu.RUnlock()
+       return e.get(hostname)
+}
+
+func endpointSliceSelectorForService(name string) klabels.Selector {
+       return klabels.Set(map[string]string{
+               v1.LabelServiceName: name,
+       }).AsSelectorPreValidated().Add(*endpointSliceRequirement)
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/multicluster.go 
b/sail/pkg/serviceregistry/kube/controller/multicluster.go
index cad7a102..06c956a1 100644
--- a/sail/pkg/serviceregistry/kube/controller/multicluster.go
+++ b/sail/pkg/serviceregistry/kube/controller/multicluster.go
@@ -7,15 +7,13 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/server"
        "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
-       
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
        "k8s.io/klog/v2"
 )
 
 type kubeController struct {
        MeshServiceController *aggregate.Controller
        *Controller
-       workloadEntryController *serviceentry.Controller
-       stop                    chan struct{}
+       stop chan struct{}
 }
 
 func (k *kubeController) Close() {
diff --git a/sail/pkg/serviceregistry/kube/controller/network.go 
b/sail/pkg/serviceregistry/kube/controller/network.go
new file mode 100644
index 00000000..32a67988
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/network.go
@@ -0,0 +1,56 @@
+package controller
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+       "github.com/apache/dubbo-kubernetes/pkg/network"
+       v1 "k8s.io/api/core/v1"
+       "sync"
+)
+
+type networkManager struct {
+       sync.RWMutex
+       clusterID             cluster.ID
+       meshNetworksWatcher   mesh.NetworksWatcher
+       network               network.ID
+       networkFromMeshConfig network.ID
+}
+
+func initNetworkManager(c *Controller, options Options) *networkManager {
+       n := &networkManager{
+               clusterID:             options.ClusterID,
+               meshNetworksWatcher:   options.MeshNetworksWatcher,
+               network:               "",
+               networkFromMeshConfig: "",
+       }
+       return n
+}
+
+func (n *networkManager) networkFromSystemNamespace() network.ID {
+       n.RLock()
+       defer n.RUnlock()
+       return n.network
+}
+
+func (n *networkManager) networkFromMeshNetworks(endpointIP string) network.ID 
{
+       n.RLock()
+       defer n.RUnlock()
+       if n.networkFromMeshConfig != "" {
+               return n.networkFromMeshConfig
+       }
+
+       return ""
+}
+
+func (n *networkManager) HasSynced() bool {
+       return true
+}
+
+func (n *networkManager) setNetworkFromNamespace(ns *v1.Namespace) bool {
+       nw := ns.Labels["topology.dubbo.io/network"]
+       n.Lock()
+       defer n.Unlock()
+       oldDefaultNetwork := n.network
+       n.network = network.ID(nw)
+       return oldDefaultNetwork != n.network
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/pod.go 
b/sail/pkg/serviceregistry/kube/controller/pod.go
new file mode 100644
index 00000000..bef04660
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/pod.go
@@ -0,0 +1,252 @@
+package controller
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+       "github.com/apache/dubbo-kubernetes/pkg/maps"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "istio.io/api/annotation"
+       v1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/types"
+       "sync"
+)
+
+type PodCache struct {
+       pods kclient.Client[*v1.Pod]
+
+       sync.RWMutex
+       // podsByIP maintains stable pod IP to name key mapping
+       // this allows us to retrieve the latest status by pod IP.
+       // This should only contain RUNNING or PENDING pods with an allocated 
IP.
+       podsByIP map[string]sets.Set[types.NamespacedName]
+       // ipByPods is a reverse map of podsByIP. This exists to allow us to 
prune stale entries in the
+       // pod cache if a pod changes IP.
+       ipByPods map[types.NamespacedName]string
+
+       // needResync is map of IP to endpoint namespace/name. This is used to 
requeue endpoint
+       // events when pod event comes. This typically happens when pod is not 
available
+       // in podCache when endpoint event comes.
+       needResync         map[string]sets.Set[types.NamespacedName]
+       queueEndpointEvent func(types.NamespacedName)
+
+       c *Controller
+}
+
+func newPodCache(c *Controller, pods kclient.Client[*v1.Pod], 
queueEndpointEvent func(types.NamespacedName)) *PodCache {
+       out := &PodCache{
+               pods:               pods,
+               c:                  c,
+               podsByIP:           
make(map[string]sets.Set[types.NamespacedName]),
+               ipByPods:           make(map[types.NamespacedName]string),
+               needResync:         
make(map[string]sets.Set[types.NamespacedName]),
+               queueEndpointEvent: queueEndpointEvent,
+       }
+
+       return out
+}
+
+func (pc *PodCache) endpointDeleted(key types.NamespacedName, ip string) {
+       pc.Lock()
+       defer pc.Unlock()
+       sets.DeleteCleanupLast(pc.needResync, ip, key)
+}
+
+func (pc *PodCache) getPodByKey(key types.NamespacedName) *v1.Pod {
+       return pc.pods.Get(key.Name, key.Namespace)
+}
+
+func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName {
+       pc.RLock()
+       defer pc.RUnlock()
+       return pc.podsByIP[addr].UnsortedList()
+}
+
+func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod {
+       keys := pc.getPodKeys(addr)
+       if keys == nil {
+               return nil
+       }
+       res := make([]*v1.Pod, 0, len(keys))
+       for _, key := range keys {
+               p := pc.getPodByKey(key)
+               // Subtle race condition. getPodKeys is our cache over pods, 
while getPodByKey hits the informer cache.
+               // if these are out of sync, p may be nil (pod was deleted).
+               if p != nil {
+                       res = append(res, p)
+               }
+       }
+       return res
+}
+
+func (pc *PodCache) queueEndpointEventOnPodArrival(key types.NamespacedName, 
ip string) {
+       pc.Lock()
+       defer pc.Unlock()
+       sets.InsertOrNew(pc.needResync, ip, key)
+}
+
+func (pc *PodCache) getIPByPod(key types.NamespacedName) string {
+       pc.RLock()
+       defer pc.RUnlock()
+       return pc.ipByPods[key]
+}
+
+func (pc *PodCache) onEvent(old, pod *v1.Pod, ev model.Event) error {
+       ip := pod.Status.PodIP
+       // PodIP will be empty when pod is just created, but before the IP is 
assigned
+       // via UpdateStatus.
+       if len(ip) == 0 {
+               // However, in the case of an Eviction, the event that marks 
the pod as Failed may *also* have removed the IP.
+               // If the pod *used to* have an IP, then we need to actually 
delete it.
+               ip = pc.getIPByPod(config.NamespacedName(pod))
+               if len(ip) == 0 {
+                       return nil
+               }
+       }
+
+       key := config.NamespacedName(pod)
+       switch ev {
+       case model.EventAdd:
+               if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
+                       pc.addPod(pod, ip, key, false)
+               } else {
+                       return nil
+               }
+       case model.EventUpdate:
+               if !shouldPodBeInEndpoints(pod) || !IsPodReady(pod) {
+                       // delete only if this pod was in the cache
+                       if !pc.deleteIP(ip, key) {
+                               return nil
+                       }
+                       ev = model.EventDelete
+               } else if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
+                       labelUpdated := pc.labelFilter(old, pod)
+                       pc.addPod(pod, ip, key, labelUpdated)
+               } else {
+                       return nil
+               }
+       case model.EventDelete:
+               // delete only if this pod was in the cache,
+               // in most case it has already been deleted in `UPDATE` with 
`DeletionTimestamp` set.
+               if !pc.deleteIP(ip, key) {
+                       return nil
+               }
+       }
+       // TODO notifyWorkloadHandlers
+       return nil
+}
+
+func (pc *PodCache) addPod(pod *v1.Pod, ip string, key types.NamespacedName, 
labelUpdated bool) {
+       pc.Lock()
+       // if the pod has been cached, return
+       if pc.podsByIP[ip].Contains(key) {
+               pc.Unlock()
+               if labelUpdated {
+                       pc.proxyUpdates(pod, true)
+               }
+               return
+       }
+       if current, f := pc.ipByPods[key]; f {
+               // The pod already exists, but with another IP Address. We need 
to clean up that
+               sets.DeleteCleanupLast(pc.podsByIP, current, key)
+       }
+       sets.InsertOrNew(pc.podsByIP, ip, key)
+       pc.ipByPods[key] = ip
+
+       if endpointsToUpdate, f := pc.needResync[ip]; f {
+               delete(pc.needResync, ip)
+               for epKey := range endpointsToUpdate {
+                       pc.queueEndpointEvent(epKey)
+               }
+       }
+       pc.Unlock()
+
+       pc.proxyUpdates(pod, false)
+}
+
+func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
+       pc.Lock()
+       defer pc.Unlock()
+       if pc.podsByIP[ip].Contains(podKey) {
+               sets.DeleteCleanupLast(pc.podsByIP, ip, podKey)
+               delete(pc.ipByPods, podKey)
+               return true
+       }
+       return false
+}
+
+func (pc *PodCache) labelFilter(old, cur *v1.Pod) bool {
+       // If labels/annotations updated, trigger proxy push
+       labelsChanged := !maps.Equal(old.Labels, cur.Labels)
+       // Annotations are only used in endpoints in one case, so just compare 
that one
+       relevantAnnotationsChanged := 
old.Annotations[annotation.AmbientRedirection.Name] != 
cur.Annotations[annotation.AmbientRedirection.Name]
+       changed := labelsChanged || relevantAnnotationsChanged
+       return changed
+}
+
+func (pc *PodCache) proxyUpdates(pod *v1.Pod, isPodUpdate bool) {
+       if pc.c != nil {
+               if pc.c.opts.XDSUpdater != nil {
+                       ip := pod.Status.PodIP
+                       pc.c.opts.XDSUpdater.ProxyUpdate(pc.c.Cluster(), ip)
+               }
+               if isPodUpdate {
+                       // Recompute service(s) due to pod label change.
+                       // If it is a new pod, no need to recompute, as it yet 
computed for the first time yet.
+                       pc.c.recomputeServiceForPod(pod)
+               }
+       }
+}
+
+func shouldPodBeInEndpoints(pod *v1.Pod) bool {
+       if isPodPhaseTerminal(pod.Status.Phase) {
+               return false
+       }
+
+       if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
+               return false
+       }
+
+       if pod.DeletionTimestamp != nil {
+               return false
+       }
+
+       return true
+}
+
+func isPodPhaseTerminal(phase v1.PodPhase) bool {
+       return phase == v1.PodFailed || phase == v1.PodSucceeded
+}
+
+func IsPodReady(pod *v1.Pod) bool {
+       return IsPodReadyConditionTrue(pod.Status)
+}
+
+func IsPodReadyConditionTrue(status v1.PodStatus) bool {
+       condition := GetPodReadyCondition(status)
+       return condition != nil && condition.Status == v1.ConditionTrue
+}
+
+func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
+       _, condition := GetPodCondition(&status, v1.PodReady)
+       return condition
+}
+
+func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) 
(int, *v1.PodCondition) {
+       if status == nil {
+               return -1, nil
+       }
+       return GetPodConditionFromList(status.Conditions, conditionType)
+}
+
+func GetPodConditionFromList(conditions []v1.PodCondition, conditionType 
v1.PodConditionType) (int, *v1.PodCondition) {
+       if conditions == nil {
+               return -1, nil
+       }
+       for i := range conditions {
+               if conditions[i].Type == conditionType {
+                       return i, &conditions[i]
+               }
+       }
+       return -1, nil
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/util.go 
b/sail/pkg/serviceregistry/kube/controller/util.go
index f8c5ecaf..cc9db054 100644
--- a/sail/pkg/serviceregistry/kube/controller/util.go
+++ b/sail/pkg/serviceregistry/kube/controller/util.go
@@ -2,6 +2,8 @@ package controller
 
 import (
        "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+       v1 "k8s.io/api/core/v1"
        klabels "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/selection"
        "k8s.io/apimachinery/pkg/util/validation/field"
@@ -14,3 +16,14 @@ func labelRequirement(key string, op selection.Operator, 
vals []string, opts ...
        }
        return out
 }
+
+func getPodServices(allServices []*v1.Service, pod *v1.Pod) []*v1.Service {
+       var services []*v1.Service
+       for _, service := range allServices {
+               if labels.Instance(service.Spec.Selector).Match(pod.Labels) {
+                       services = append(services, service)
+               }
+       }
+
+       return services
+}
diff --git a/sail/pkg/serviceregistry/kube/conversion.go 
b/sail/pkg/serviceregistry/kube/conversion.go
index 1d616bd1..68072953 100644
--- a/sail/pkg/serviceregistry/kube/conversion.go
+++ b/sail/pkg/serviceregistry/kube/conversion.go
@@ -150,3 +150,7 @@ func convertPort(port corev1.ServicePort) *model.Port {
 func kubeToDubboServiceAccount(saname string, ns string, mesh 
*meshconfig.MeshConfig) string {
        return spiffe.MustGenSpiffeURI(mesh, ns, saname)
 }
+
+func SecureNamingSAN(pod *corev1.Pod, mesh *meshconfig.MeshConfig) string {
+       return spiffe.MustGenSpiffeURI(mesh, pod.Namespace, 
pod.Spec.ServiceAccountName)
+}
diff --git a/sail/pkg/serviceregistry/serviceentry/controller.go 
b/sail/pkg/serviceregistry/serviceentry/controller.go
deleted file mode 100644
index 5d116e7c..00000000
--- a/sail/pkg/serviceregistry/serviceentry/controller.go
+++ /dev/null
@@ -1,4 +0,0 @@
-package serviceentry
-
-type Controller struct {
-}
diff --git a/sail/pkg/serviceregistry/util/label/label.go 
b/sail/pkg/serviceregistry/util/label/label.go
new file mode 100644
index 00000000..3eaecc5b
--- /dev/null
+++ b/sail/pkg/serviceregistry/util/label/label.go
@@ -0,0 +1,60 @@
+package label
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+       "github.com/apache/dubbo-kubernetes/pkg/network"
+       "istio.io/api/label"
+       "strings"
+)
+
+const (
+       LabelHostname = "kubernetes.io/hostname"
+
+       LabelTopologyZone    = "topology.kubernetes.io/zone"
+       LabelTopologySubzone = "topology.dubbo.io/subzone"
+       LabelTopologyRegion  = "topology.kubernetes.io/region"
+)
+
+func AugmentLabels(in labels.Instance, clusterID cluster.ID, locality, k8sNode 
string, networkID network.ID) labels.Instance {
+       // Copy the original labels to a new map.
+       out := make(labels.Instance, len(in)+6)
+       for k, v := range in {
+               out[k] = v
+       }
+
+       region, zone, subzone := SplitLocalityLabel(locality)
+       if len(region) > 0 {
+               out[LabelTopologyRegion] = region
+       }
+       if len(zone) > 0 {
+               out[LabelTopologyZone] = zone
+       }
+       if len(subzone) > 0 {
+               out[label.TopologySubzone.Name] = subzone
+       }
+       if len(clusterID) > 0 {
+               out[label.TopologyCluster.Name] = clusterID.String()
+       }
+       if len(k8sNode) > 0 {
+               out[LabelHostname] = k8sNode
+       }
+       // In c.Network(), we already set the network label in priority order 
pod labels > namespace label > mesh Network
+       // We won't let proxy.Metadata.Network override the above.
+       if len(networkID) > 0 && out[label.TopologyNetwork.Name] == "" {
+               out[label.TopologyNetwork.Name] = networkID.String()
+       }
+       return out
+}
+
+func SplitLocalityLabel(locality string) (region, zone, subzone string) {
+       items := strings.Split(locality, "/")
+       switch len(items) {
+       case 1:
+               return items[0], "", ""
+       case 2:
+               return items[0], items[1], ""
+       default:
+               return items[0], items[1], items[2]
+       }
+}
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 9d287319..f5bbd7d1 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -105,11 +105,8 @@ func (s *DiscoveryServer) adsClientCount() int {
        return len(s.adsClients)
 }
 
-func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, 
error) {
-       return nil, nil
-}
-
 func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection, 
identities []string) error {
+
        return nil
 }
 
@@ -129,6 +126,7 @@ func (s *DiscoveryServer) Stream(stream DiscoveryStream) 
error {
        if peerInfo, ok := peer.FromContext(ctx); ok {
                peerAddr = peerInfo.Addr.String()
        }
+
        // TODO WaitForRequestLimit?
 
        if err := s.WaitForRequestLimit(stream.Context()); err != nil {
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 6ca0ea4a..636bedf7 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -164,6 +164,46 @@ func (s *DiscoveryServer) IsServerReady() bool {
        return s.serverReady.Load()
 }
 
+func (s *DiscoveryServer) ProxyUpdate(clusterID cluster.ID, ip string) {
+       var connection *Connection
+
+       for _, v := range s.Clients() {
+               if v.proxy.Metadata.ClusterID == clusterID && 
v.proxy.IPAddresses[0] == ip {
+                       connection = v
+                       break
+               }
+       }
+
+       // It is possible that the envoy has not connected to this pilot, maybe 
connected to another pilot
+       if connection == nil {
+               return
+       }
+
+       s.pushQueue.Enqueue(connection, &model.PushRequest{
+               Full:   true,
+               Push:   s.globalPushContext(),
+               Start:  time.Now(),
+               Reason: model.NewReasonStats(model.ProxyUpdate),
+               Forced: true,
+       })
+}
+
+func (s *DiscoveryServer) Clients() []*Connection {
+       s.adsClientsMutex.RLock()
+       defer s.adsClientsMutex.RUnlock()
+       clients := make([]*Connection, 0, len(s.adsClients))
+       for _, con := range s.adsClients {
+               select {
+               case <-con.InitializedCh():
+               default:
+                       // Initialization not complete, skip
+                       continue
+               }
+               clients = append(clients, con)
+       }
+       return clients
+}
+
 func reasonsUpdated(req *model.PushRequest) string {
        var (
                reason0, reason1            model.TriggerReason
diff --git a/sail/pkg/xds/eds.go b/sail/pkg/xds/eds.go
index d2afda10..b35b3630 100644
--- a/sail/pkg/xds/eds.go
+++ b/sail/pkg/xds/eds.go
@@ -1,8 +1,12 @@
 package xds
 
-import "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+)
 
-func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey, hostname string, 
namespace string, event model.Event) {
+func (s *DiscoveryServer) ServiceUpdate(shard model.ShardKey, hostname string, 
namespace string, event model.Event) {
        // When a service deleted, we should cleanup the endpoint shards and 
also remove keys from EndpointIndex to
        // prevent memory leaks.
        if event == model.EventDelete {
@@ -10,3 +14,25 @@ func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey, 
hostname string, names
        } else {
        }
 }
+
+func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string, 
namespace string,
+       dubboEndpoints []*model.DubboEndpoint,
+) {
+       // Update the endpoint shards
+       pushType := s.Env.EndpointIndex.UpdateServiceEndpoints(shard, 
serviceName, namespace, dubboEndpoints, true)
+       if pushType == model.IncrementalPush || pushType == model.FullPush {
+               // Trigger a push
+               s.ConfigUpdate(&model.PushRequest{
+                       Full:           pushType == model.FullPush,
+                       ConfigsUpdated: sets.New(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
+                       Reason:         
model.NewReasonStats(model.EndpointUpdate),
+               })
+       }
+}
+
+func (s *DiscoveryServer) EDSCacheUpdate(shard model.ShardKey, serviceName 
string, namespace string,
+       istioEndpoints []*model.DubboEndpoint,
+) {
+       // Update the endpoint shards
+       s.Env.EndpointIndex.UpdateServiceEndpoints(shard, serviceName, 
namespace, istioEndpoints, false)
+}
diff --git a/sail/pkg/xds/xdsgen.go b/sail/pkg/xds/xdsgen.go
index b3c7986c..cc2a0598 100644
--- a/sail/pkg/xds/xdsgen.go
+++ b/sail/pkg/xds/xdsgen.go
@@ -31,6 +31,7 @@ func (s *DiscoveryServer) pushXds(con *Connection, w 
*model.WatchedResource, req
                        ResourceNames: req.Delta.Subscribed,
                }
        }
+
        res, logdata, err := gen.Generate(con.proxy, w, req)
        info := ""
        if len(logdata.AdditionalInfo) > 0 {

Reply via email to