This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new e01c5471 [discovery] full push service xds update (#803)
e01c5471 is described below
commit e01c54719e31e9f602835909b4d86175ed106374
Author: Jian Zhong <[email protected]>
AuthorDate: Thu Oct 16 15:26:38 2025 +0800
[discovery] full push service xds update (#803)
---
Dockerfile | 2 +-
pkg/config/model.go | 5 +-
pkg/config/schema/gvk/resources.go | 3 +
pkg/config/schema/gvr/resources.go | 3 +
pkg/config/schema/kubeclient/resources.go | 11 +-
pkg/config/schema/kubetypes/resources.go | 2 +
pkg/kube/inject/inject.go | 236 ++++++++++++++++
pkg/kube/inject/watcher.go | 14 +-
pkg/kube/inject/webhook.go | 73 ++++-
pkg/kube/util.go | 35 +++
pkg/webhooks/validation/controller/controller.go | 3 +-
pkg/webhooks/webhookpatch.go | 3 +-
sail/pkg/bootstrap/server.go | 2 -
sail/pkg/model/push_context.go | 8 +-
sail/pkg/model/service.go | 28 +-
.../serviceregistry/kube/controller/controller.go | 154 ++++++++++-
.../kube/controller/endpoint_builder.go | 97 +++++++
.../kube/controller/endpointslice.go | 296 +++++++++++++++++++--
.../kube/controller/multicluster.go | 4 +-
.../pkg/serviceregistry/kube/controller/network.go | 56 ++++
sail/pkg/serviceregistry/kube/controller/pod.go | 252 ++++++++++++++++++
sail/pkg/serviceregistry/kube/controller/util.go | 13 +
sail/pkg/serviceregistry/kube/conversion.go | 4 +
.../pkg/serviceregistry/serviceentry/controller.go | 4 -
sail/pkg/serviceregistry/util/label/label.go | 60 +++++
sail/pkg/xds/ads.go | 6 +-
sail/pkg/xds/discovery.go | 40 +++
sail/pkg/xds/eds.go | 30 ++-
sail/pkg/xds/xdsgen.go | 1 +
29 files changed, 1364 insertions(+), 81 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index 63b52fd0..d7f227e1 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -4,4 +4,4 @@ ENTRYPOINT ["./sail-discovery"]
FROM gcr.io/distroless/static:debug AS agent
COPY sail-agent .
-ENTRYPOINT ["./sail-agent"]
+ENTRYPOINT ["./sail-agent"]
\ No newline at end of file
diff --git a/pkg/config/model.go b/pkg/config/model.go
index 204feb77..71ad19d5 100644
--- a/pkg/config/model.go
+++ b/pkg/config/model.go
@@ -44,7 +44,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
- "istio.io/api/label"
)
// Meta is metadata attached to each configuration unit.
@@ -132,7 +131,7 @@ func (o ObjectWithCluster[T]) GetObjectKeyable() any {
}
func LabelsInRevision(lbls map[string]string, rev string) bool {
- configEnv, f := lbls[label.IoIstioRev.Name]
+ configEnv, f := lbls["dubbo.io/rev"]
if !f {
// This is a global object, and always included
return true
@@ -150,7 +149,7 @@ func LabelsInRevisionOrTags(lbls map[string]string, rev
string, tags sets.Set[st
if LabelsInRevision(lbls, rev) {
return true
}
- configEnv := lbls[label.IoIstioRev.Name]
+ configEnv := lbls["dubbo.io/rev"]
// Otherwise, only return true if revisions equal
return tags.Contains(configEnv)
}
diff --git a/pkg/config/schema/gvk/resources.go
b/pkg/config/schema/gvk/resources.go
index 36dc2a91..b093fc04 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -35,6 +35,7 @@ var (
ConfigMap = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "ConfigMap"}
Secret = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "Secret"}
Service = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "Service"}
+ Pod = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "Pod"}
ServiceAccount = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "ServiceAccount"}
MeshConfig = config.GroupVersionKind{Group: "",
Version: "v1alpha1", Kind: "MeshConfig"}
RequestAuthentication = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "RequestAuthentication"}
@@ -88,6 +89,8 @@ func ToGVR(g config.GroupVersionKind)
(schema.GroupVersionResource, bool) {
return gvr.EndpointSlice, true
case Endpoints:
return gvr.Endpoints, true
+ case Pod:
+ return gvr.Pod, true
}
return schema.GroupVersionResource{}, false
}
diff --git a/pkg/config/schema/gvr/resources.go
b/pkg/config/schema/gvr/resources.go
index 86f1d054..48d27b81 100644
--- a/pkg/config/schema/gvr/resources.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -42,6 +42,7 @@ var (
VirtualService = schema.GroupVersionResource{Group:
"networking.dubbo.io", Version: "v1", Resource: "virtualservices"}
EndpointSlice = schema.GroupVersionResource{Group:
"discovery.k8s.io", Version: "v1", Resource: "endpointslices"}
Endpoints = schema.GroupVersionResource{Group: "",
Version: "v1", Resource: "endpoints"}
+ Pod = schema.GroupVersionResource{Group: "",
Version: "v1", Resource: "pods"}
)
func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -80,6 +81,8 @@ func IsClusterScoped(g schema.GroupVersionResource) bool {
return false
case Endpoints:
return false
+ case Pod:
+ return false
}
return false
}
diff --git a/pkg/config/schema/kubeclient/resources.go
b/pkg/config/schema/kubeclient/resources.go
index ff003d02..de7912eb 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -78,7 +78,6 @@ func GetWriteClient[T runtime.Object](c ClientGetter,
namespace string) ktypes.W
return
c.Dubbo().NetworkingV1().VirtualServices(namespace).(ktypes.WriteAPI[T])
case *apiistioioapinetworkingv1.DestinationRule:
return
c.Dubbo().NetworkingV1().DestinationRules(namespace).(ktypes.WriteAPI[T])
-
default:
panic(fmt.Sprintf("Unknown type %T", ptr.Empty[T]()))
}
@@ -108,6 +107,8 @@ func gvrToObject(g schema.GroupVersionResource)
runtime.Object {
return &k8sioapicorev1.ServiceAccount{}
case gvr.StatefulSet:
return &k8sioapiappsv1.StatefulSet{}
+ case gvr.Pod:
+ return &k8sioapicorev1.Pod{}
case gvr.MutatingWebhookConfiguration:
return
&k8sioapiadmissionregistrationv1.MutatingWebhookConfiguration{}
case gvr.ValidatingWebhookConfiguration:
@@ -120,6 +121,7 @@ func gvrToObject(g schema.GroupVersionResource)
runtime.Object {
return &apiistioioapinetworkingv1.VirtualService{}
case gvr.DestinationRule:
return &apiistioioapinetworkingv1.DestinationRule{}
+
default:
panic(fmt.Sprintf("Unknown type %v", g))
}
@@ -249,6 +251,13 @@ func getInformerFiltered(c ClientGetter, opts
ktypes.InformerOptions, g schema.G
w = func(options metav1.ListOptions) (watch.Interface, error) {
return
c.Dubbo().SecurityV1().RequestAuthentications(opts.Namespace).Watch(context.Background(),
options)
}
+ case gvr.Pod:
+ l = func(options metav1.ListOptions) (runtime.Object, error) {
+ return
c.Kube().CoreV1().Pods(opts.Namespace).List(context.Background(), options)
+ }
+ w = func(options metav1.ListOptions) (watch.Interface, error) {
+ return
c.Kube().CoreV1().Pods(opts.Namespace).Watch(context.Background(), options)
+ }
default:
panic(fmt.Sprintf("Unknown type %v", g))
}
diff --git a/pkg/config/schema/kubetypes/resources.go
b/pkg/config/schema/kubetypes/resources.go
index 562e8a54..bbe30e9b 100644
--- a/pkg/config/schema/kubetypes/resources.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -56,6 +56,8 @@ func getGvk(obj any) (config.GroupVersionKind, bool) {
return gvk.Endpoints, true
case *k8sioapicorev1.Service:
return gvk.Service, true
+ case *k8sioapicorev1.Pod:
+ return gvk.Pod, true
default:
return config.GroupVersionKind{}, false
}
diff --git a/pkg/kube/inject/inject.go b/pkg/kube/inject/inject.go
index d2042f99..0989dae8 100644
--- a/pkg/kube/inject/inject.go
+++ b/pkg/kube/inject/inject.go
@@ -1,12 +1,21 @@
package inject
import (
+ "bytes"
+ "encoding/json"
"fmt"
"github.com/Masterminds/sprig/v3"
+ opconfig "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
+ common_features "github.com/apache/dubbo-kubernetes/pkg/features"
+ "istio.io/api/annotation"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ proxyConfig "istio.io/api/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
+ "strings"
"text/template"
)
@@ -24,6 +33,29 @@ type Config struct {
Aliases map[string][]string `json:"aliases"`
}
+type ProxylessTemplateData struct {
+ TypeMeta metav1.TypeMeta
+ DeploymentMeta types.NamespacedName
+ ObjectMeta metav1.ObjectMeta
+ Spec corev1.PodSpec
+ ProxyConfig *meshconfig.ProxyConfig
+ MeshConfig *meshconfig.MeshConfig
+ Values map[string]any
+ Revision string
+ NativeSidecars bool
+ ProxyImage string
+ InboundTrafficPolicyMode string
+ CompliancePolicy string
+}
+
+type ProxylessInjectionStatus struct {
+ InitContainers []string `json:"initContainers"`
+ Containers []string `json:"containers"`
+ Volumes []string `json:"volumes"`
+ ImagePullSecrets []string `json:"imagePullSecrets"`
+ Revision string `json:"revision"`
+}
+
func UnmarshalConfig(yml []byte) (Config, error) {
var injectConfig Config
if err := yaml.Unmarshal(yml, &injectConfig); err != nil {
@@ -61,5 +93,209 @@ func potentialPodName(metadata metav1.ObjectMeta) string {
}
func RunTemplate(params InjectionParameters) (mergedPod *corev1.Pod,
templatePod *corev1.Pod, err error) {
+ meshConfig := params.meshConfig
+
+ strippedPod, err := reinsertOverrides(stripPod(params))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ data := ProxylessTemplateData{
+ TypeMeta: params.typeMeta,
+ DeploymentMeta: params.deployMeta,
+ ObjectMeta: strippedPod.ObjectMeta,
+ Spec: strippedPod.Spec,
+ ProxyConfig: params.proxyConfig,
+ MeshConfig: meshConfig,
+ Values: params.valuesConfig.asMap,
+ Revision: params.revision,
+ ProxyImage: ProxyImage(params.valuesConfig.asStruct,
params.proxyConfig.Image, strippedPod.Annotations),
+ CompliancePolicy: common_features.CompliancePolicy,
+ }
+
+ mergedPod = params.pod
+ templatePod = &corev1.Pod{}
+
+ for _, templateName := range selectTemplates(params) {
+ parsedTemplate, f := params.templates[templateName]
+ if !f {
+ return nil, nil, fmt.Errorf("requested template %q not
found; have %v",
+ templateName,
strings.Join(knownTemplates(params.templates), ", "))
+ }
+ bbuf, err := runTemplate(parsedTemplate, data)
+ if err != nil {
+ return nil, nil, err
+ }
+ templatePod, err = applyOverlayYAML(templatePod, bbuf.Bytes())
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed applying injection
overlay: %v", err)
+ }
+ mergedPod, err = applyOverlayYAML(mergedPod, bbuf.Bytes())
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed parsing generated
injected YAML (check Istio sidecar injector configuration): %v", err)
+ }
+ }
+
return mergedPod, templatePod, nil
}
+
+func knownTemplates(t Templates) []string {
+ keys := make([]string, 0, len(t))
+ for k := range t {
+ keys = append(keys, k)
+ }
+ return keys
+}
+
+func runTemplate(tmpl *template.Template, data ProxylessTemplateData)
(bytes.Buffer, error) {
+ var res bytes.Buffer
+ if err := tmpl.Execute(&res, &data); err != nil {
+ klog.Errorf("Invalid template: %v", err)
+ return bytes.Buffer{}, err
+ }
+
+ return res, nil
+}
+
+func selectTemplates(params InjectionParameters) []string {
+ if a, f := params.pod.Annotations[annotation.InjectTemplates.Name]; f {
+ names := []string{}
+ for _, tmplName := range strings.Split(a, ",") {
+ name := strings.TrimSpace(tmplName)
+ names = append(names, name)
+ }
+ return resolveAliases(params, names)
+ }
+ return resolveAliases(params, params.defaultTemplate)
+}
+
+func resolveAliases(params InjectionParameters, names []string) []string {
+ ret := []string{}
+ for _, name := range names {
+ if al, f := params.aliases[name]; f {
+ ret = append(ret, al...)
+ } else {
+ ret = append(ret, name)
+ }
+ }
+ return ret
+}
+
+func stripPod(req InjectionParameters) *corev1.Pod {
+ pod := req.pod.DeepCopy()
+ prevStatus := injectionStatus(pod)
+ if prevStatus == nil {
+ return req.pod
+ }
+ // We found a previous status annotation. Possibly we are re-injecting
the pod
+ // To ensure idempotency, remove our injected containers first
+ for _, c := range prevStatus.Containers {
+ pod.Spec.Containers = modifyContainers(pod.Spec.Containers, c,
Remove)
+ }
+ for _, c := range prevStatus.InitContainers {
+ pod.Spec.InitContainers =
modifyContainers(pod.Spec.InitContainers, c, Remove)
+ }
+
+ delete(pod.Annotations, annotation.SidecarStatus.Name)
+
+ return pod
+}
+
+type ContainerReorder int
+
+const (
+ MoveFirst ContainerReorder = iota
+ MoveLast
+ Remove
+)
+
+func modifyContainers(cl []corev1.Container, name string, modifier
ContainerReorder) []corev1.Container {
+ containers := []corev1.Container{}
+ var match *corev1.Container
+ for _, c := range cl {
+ if c.Name != name {
+ containers = append(containers, c)
+ } else {
+ match = &c
+ }
+ }
+ if match == nil {
+ return containers
+ }
+ switch modifier {
+ case MoveFirst:
+ return append([]corev1.Container{*match}, containers...)
+ case MoveLast:
+ return append(containers, *match)
+ case Remove:
+ return containers
+ default:
+ return cl
+ }
+}
+
+func injectionStatus(pod *corev1.Pod) *ProxylessInjectionStatus {
+ var statusBytes []byte
+ if pod.ObjectMeta.Annotations != nil {
+ if value, ok :=
pod.ObjectMeta.Annotations[annotation.SidecarStatus.Name]; ok {
+ statusBytes = []byte(value)
+ }
+ }
+ if statusBytes == nil {
+ return nil
+ }
+
+ // default case when injected pod has explicit status
+ var iStatus ProxylessInjectionStatus
+ if err := json.Unmarshal(statusBytes, &iStatus); err != nil {
+ return nil
+ }
+ return &iStatus
+}
+
+func reinsertOverrides(pod *corev1.Pod) (*corev1.Pod, error) {
+ type podOverrides struct {
+ Containers []corev1.Container `json:"containers,omitempty"`
+ InitContainers []corev1.Container
`json:"initContainers,omitempty"`
+ }
+
+ existingOverrides := podOverrides{}
+ if annotationOverrides, f :=
pod.Annotations[annotation.ProxyOverrides.Name]; f {
+ if err := json.Unmarshal([]byte(annotationOverrides),
&existingOverrides); err != nil {
+ return nil, err
+ }
+ }
+
+ pod = pod.DeepCopy()
+ for _, c := range existingOverrides.Containers {
+ match := FindContainer(c.Name, pod.Spec.Containers)
+ if match != nil {
+ continue
+ }
+ pod.Spec.Containers = append(pod.Spec.Containers, c)
+ }
+
+ for _, c := range existingOverrides.InitContainers {
+ match := FindContainer(c.Name, pod.Spec.InitContainers)
+ if match != nil {
+ continue
+ }
+ pod.Spec.InitContainers = append(pod.Spec.InitContainers, c)
+ }
+
+ return pod, nil
+}
+
+func FindContainer(name string, containers []corev1.Container)
*corev1.Container {
+ for i := range containers {
+ if containers[i].Name == name {
+ return &containers[i]
+ }
+ }
+ return nil
+}
+
+func ProxyImage(values *opconfig.Values, image *proxyConfig.ProxyImage,
annotations map[string]string) string {
+ imageName := "proxyxds"
+ return imageName
+}
diff --git a/pkg/kube/inject/watcher.go b/pkg/kube/inject/watcher.go
index e7988ef5..972ea539 100644
--- a/pkg/kube/inject/watcher.go
+++ b/pkg/kube/inject/watcher.go
@@ -61,13 +61,13 @@ func (w *fileWatcher) Run(stop <-chan struct{}) {
select {
case <-timerC:
timerC = nil
- sidecarConfig, valuesConfig, err := w.Get()
+ proxylessconfig, valuesConfig, err := w.Get()
if err != nil {
klog.Errorf("update error: %v", err)
break
}
if w.handler != nil {
- if err := w.handler(sidecarConfig,
valuesConfig); err != nil {
+ if err := w.handler(proxylessconfig,
valuesConfig); err != nil {
klog.Errorf("update error: %v", err)
}
}
@@ -108,12 +108,12 @@ func (w *configMapWatcher) Run(stop <-chan struct{}) {
}
func (w *configMapWatcher) Get() (*Config, string, error) {
- cms := w.client.Kube().CoreV1().ConfigMaps(w.namespace)
- cm, err := cms.Get(context.TODO(), w.name, metav1.GetOptions{})
+ configmaps := w.client.Kube().CoreV1().ConfigMaps(w.namespace)
+ configmap, err := configmaps.Get(context.TODO(), w.name,
metav1.GetOptions{})
if err != nil {
return nil, "", err
}
- return readConfigMap(cm, w.configKey, w.valuesKey)
+ return readConfigMap(configmap, w.configKey, w.valuesKey)
}
func NewConfigMapWatcher(client kube.Client, namespace, name, configKey,
valuesKey string) Watcher {
@@ -125,13 +125,13 @@ func NewConfigMapWatcher(client kube.Client, namespace,
name, configKey, valuesK
valuesKey: valuesKey,
}
w.c = configmapwatcher.NewController(client, namespace, name, func(cm
*v1.ConfigMap) {
- sidecarConfig, valuesConfig, err := readConfigMap(cm,
configKey, valuesKey)
+ proxylessConfig, valuesConfig, err := readConfigMap(cm,
configKey, valuesKey)
if err != nil {
klog.Warningf("failed to read injection config from
ConfigMap: %v", err)
return
}
if w.handler != nil {
- if err := w.handler(sidecarConfig, valuesConfig); err
!= nil {
+ if err := w.handler(proxylessConfig, valuesConfig); err
!= nil {
klog.Errorf("update error: %v", err)
}
}
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
index d8717c17..6f24befc 100644
--- a/pkg/kube/inject/webhook.go
+++ b/pkg/kube/inject/webhook.go
@@ -17,6 +17,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/mergepatch"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog/v2"
"net/http"
"os"
@@ -254,11 +256,6 @@ func injectPod(req InjectionParameters) ([]byte, error) {
return nil, fmt.Errorf("failed to re apply container: %v", err)
}
- // Apply some additional transformations to the pod
- if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
- return nil, fmt.Errorf("failed to process pod: %v", err)
- }
-
patch, err := createPatch(mergedPod, originalPodSpec)
if err != nil {
return nil, fmt.Errorf("failed to create patch: %v", err)
@@ -267,7 +264,9 @@ func injectPod(req InjectionParameters) ([]byte, error) {
return patch, nil
}
-func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod
*corev1.Pod, templatePod *corev1.Pod, proxyConfig *meshconfig.ProxyConfig)
(*corev1.Pod, error) {
+func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod
*corev1.Pod, templatePod *corev1.Pod,
+ proxyConfig *meshconfig.ProxyConfig,
+) (*corev1.Pod, error) {
return finalPod, nil
}
@@ -283,15 +282,61 @@ func createPatch(pod *corev1.Pod, original []byte)
([]byte, error) {
return json.Marshal(p)
}
-func postProcessPod(pod *corev1.Pod, injectedPod corev1.Pod, req
InjectionParameters) error {
- if pod.Annotations == nil {
- pod.Annotations = map[string]string{}
+func applyOverlayYAML(target *corev1.Pod, overlayYAML []byte) (*corev1.Pod,
error) {
+ currentJSON, err := json.Marshal(target)
+ if err != nil {
+ return nil, err
}
- if pod.Labels == nil {
- pod.Labels = map[string]string{}
+
+ pod := corev1.Pod{}
+ // Overlay the injected template onto the original podSpec
+ patched, err := StrategicMergePatchYAML(currentJSON, overlayYAML, pod)
+ if err != nil {
+ return nil, fmt.Errorf("strategic merge: %v", err)
}
- return nil
+ if err := json.Unmarshal(patched, &pod); err != nil {
+ return nil, fmt.Errorf("unmarshal patched pod: %v", err)
+ }
+ return &pod, nil
+}
+
+func StrategicMergePatchYAML(originalJSON []byte, patchYAML []byte, dataStruct
any) ([]byte, error) {
+ schema, err := strategicpatch.NewPatchMetaFromStruct(dataStruct)
+ if err != nil {
+ return nil, err
+ }
+
+ originalMap, err := patchHandleUnmarshal(originalJSON, json.Unmarshal)
+ if err != nil {
+ return nil, err
+ }
+ patchMap, err := patchHandleUnmarshal(patchYAML, func(data []byte, v
any) error {
+ return yaml.Unmarshal(data, v)
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ result, err :=
strategicpatch.StrategicMergeMapPatchUsingLookupPatchMeta(originalMap,
patchMap, schema)
+ if err != nil {
+ return nil, err
+ }
+
+ return json.Marshal(result)
+}
+
+func patchHandleUnmarshal(j []byte, unmarshal func(data []byte, v any) error)
(map[string]any, error) {
+ if j == nil {
+ j = []byte("{}")
+ }
+
+ m := map[string]any{}
+ err := unmarshal(j, &m)
+ if err != nil {
+ return nil, mergepatch.ErrBadJSONDoc
+ }
+ return m, nil
}
func parseInjectEnvs(path string) map[string]string {
@@ -353,8 +398,8 @@ func (wh *Webhook) inject(ar *kube.AdmissionReview, path
string) *kube.Admission
if pod.ObjectMeta.Namespace == "" {
pod.ObjectMeta.Namespace = req.Namespace
}
- klog.Infof("Namespace: %v podName: %s", pod.Namespace+"/"+podName)
- klog.Infof("Process proxyless injection request")
+ klog.Info(pod.Namespace + "/" + podName)
+ klog.Infof("path=%s Process proxyless injection request", path)
wh.mu.RLock()
proxyConfig := wh.env.GetProxyConfigOrDefault(pod.Namespace,
pod.Labels, pod.Annotations, wh.meshConfig)
diff --git a/pkg/kube/util.go b/pkg/kube/util.go
index 10309ed3..36ba6664 100644
--- a/pkg/kube/util.go
+++ b/pkg/kube/util.go
@@ -101,6 +101,41 @@ func SetRestDefaults(config *rest.Config) *rest.Config {
return config
}
+func StripPodUnusedFields(obj any) (any, error) {
+ t, ok := obj.(metav1.ObjectMetaAccessor)
+ if !ok {
+ // shouldn't happen
+ return obj, nil
+ }
+ // ManagedFields is large and we never use it
+ t.GetObjectMeta().SetManagedFields(nil)
+ // only container ports can be used
+ if pod := obj.(*corev1.Pod); pod != nil {
+ containers := []corev1.Container{}
+ for _, c := range pod.Spec.Containers {
+ if len(c.Ports) > 0 {
+ containers = append(containers,
corev1.Container{
+ Ports: c.Ports,
+ })
+ }
+ }
+ oldSpec := pod.Spec
+ newSpec := corev1.PodSpec{
+ Containers: containers,
+ ServiceAccountName: oldSpec.ServiceAccountName,
+ NodeName: oldSpec.NodeName,
+ HostNetwork: oldSpec.HostNetwork,
+ Hostname: oldSpec.Hostname,
+ Subdomain: oldSpec.Subdomain,
+ }
+ pod.Spec = newSpec
+ pod.Status.InitContainerStatuses = nil
+ pod.Status.ContainerStatuses = nil
+ }
+
+ return obj, nil
+}
+
const MaxRequestBodyBytes = int64(6 * 1024 * 1024)
func HTTPConfigReader(req *http.Request) ([]byte, error) {
diff --git a/pkg/webhooks/validation/controller/controller.go
b/pkg/webhooks/validation/controller/controller.go
index 169f2582..bba26ab8 100644
--- a/pkg/webhooks/validation/controller/controller.go
+++ b/pkg/webhooks/validation/controller/controller.go
@@ -9,7 +9,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
- "istio.io/api/label"
kubeApiAdmission "k8s.io/api/admissionregistration/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
@@ -152,7 +151,7 @@ func newController(o Options, client kube.Client)
*Controller {
controllers.WithRateLimiter(workqueue.NewTypedItemExponentialFailureRateLimiter[any](100*time.Millisecond,
1*time.Minute)))
c.webhooks =
kclient.NewFiltered[*kubeApiAdmission.ValidatingWebhookConfiguration](client,
kclient.Filter{
- LabelSelector: fmt.Sprintf("%s=%s", label.IoIstioRev.Name,
o.Revision),
+ LabelSelector: fmt.Sprintf("%s=%s", "dubbo.io/rev", o.Revision),
})
c.webhooks.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
diff --git a/pkg/webhooks/webhookpatch.go b/pkg/webhooks/webhookpatch.go
index f8b507ac..5eaea60a 100644
--- a/pkg/webhooks/webhookpatch.go
+++ b/pkg/webhooks/webhookpatch.go
@@ -8,7 +8,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
- "istio.io/api/label"
v1 "k8s.io/api/admissionregistration/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
klabels "k8s.io/apimachinery/pkg/labels"
@@ -83,7 +82,7 @@ func (w *WebhookCertPatcher)
patchMutatingWebhookConfig(webhookConfigName string
return errNotFound
}
// prevents a race condition between multiple istiods when the revision
is changed or modified
- v, ok := config.Labels[label.IoIstioRev.Name]
+ v, ok := config.Labels["dubbo.io/rev"]
if !ok {
return nil
}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 6207b934..97c1085e 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -47,7 +47,6 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/server"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
-
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
@@ -99,7 +98,6 @@ type Server struct {
ConfigStores []model.ConfigStoreController
configController model.ConfigStoreController
multiclusterController *multicluster.Controller
- serviceEntryController *serviceentry.Controller
fileWatcher filewatcher.FileWatcher
internalStop chan struct{}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 5d0cd60f..eb147798 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -19,6 +19,7 @@ package model
import (
"cmp"
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
@@ -49,6 +50,8 @@ const (
ProxyRequest TriggerReason = "proxyrequest"
GlobalUpdate TriggerReason = "global"
HeadlessEndpointUpdate TriggerReason = "headlessendpoint"
+ EndpointUpdate TriggerReason = "endpoint"
+ ProxyUpdate TriggerReason = "proxy"
)
type ProxyPushStatus struct {
@@ -148,7 +151,10 @@ func (pr *PushRequest) CopyMerge(other *PushRequest)
*PushRequest {
type XDSUpdater interface {
ConfigUpdate(req *PushRequest)
- SvcUpdate(shard ShardKey, hostname string, namespace string, event
Event)
+ ServiceUpdate(shard ShardKey, hostname string, namespace string, event
Event)
+ EDSUpdate(shard ShardKey, hostname string, namespace string, entry
[]*DubboEndpoint)
+ EDSCacheUpdate(shard ShardKey, hostname string, namespace string, entry
[]*DubboEndpoint)
+ ProxyUpdate(clusterID cluster.ID, ip string)
}
func (ps *PushContext) InitContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 0c69d941..95985f8b 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -7,6 +7,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/protocol"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/network"
"github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
@@ -58,29 +59,30 @@ func (p *endpointDiscoverabilityPolicyImpl) CmpOpts()
[]cmp.Option {
type HealthStatus int32
const (
- // Healthy indicates an endpoint is ready to accept traffic
- Healthy HealthStatus = 1
- // UnHealthy indicates an endpoint is not ready to accept traffic
- UnHealthy HealthStatus = 2
- // Draining is a special case, which is used only when persistent
sessions are enabled. This indicates an endpoint
- // was previously healthy, but is now shutting down.
- // Without persistent sessions, an endpoint that is shutting down will
be marked as Terminating.
- Draining HealthStatus = 3
- // Terminating marks an endpoint as shutting down. Similar to
"unhealthy", this means we should not send it traffic.
- // But unlike "unhealthy", this means we do not consider it when
calculating failover.
+ Healthy HealthStatus = 1
+ UnHealthy HealthStatus = 2
+ Draining HealthStatus = 3
Terminating HealthStatus = 4
)
type DubboEndpoint struct {
ServiceAccount string
Addresses []string
- WorkloadName string
ServicePortName string
Labels labels.Instance
HealthStatus HealthStatus
SendUnhealthyEndpoints bool
DiscoverabilityPolicy EndpointDiscoverabilityPolicy `json:"-"`
LegacyClusterPortKey int
+ EndpointPort uint32
+ WorkloadName string
+ Network network.ID
+ Namespace string
+ // Specifies the hostname of the Pod, empty for vm workload.
+ HostName string
+ // If specified, the fully qualified Pod hostname will be
"<hostname>.<subdomain>.<pod namespace>.svc.<cluster domain>".
+ SubDomain string
+ NodeName string
}
func (ep *DubboEndpoint) FirstAddressOrNil() string {
@@ -413,3 +415,7 @@ func (s *ServiceAttributes) Equals(other
*ServiceAttributes) bool {
return s.Name == other.Name && s.Namespace == other.Namespace &&
s.ServiceRegistry == other.ServiceRegistry && s.K8sAttributes
== other.K8sAttributes
}
+
+func (s *Service) SupportsUnhealthyEndpoints() bool {
+ return false
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go
b/sail/pkg/serviceregistry/kube/controller/controller.go
index 839ad194..558cfcb3 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -20,23 +20,31 @@ package controller
import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/network"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/queue"
"github.com/apache/dubbo-kubernetes/pkg/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
+ "istio.io/api/label"
v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sort"
@@ -44,7 +52,12 @@ import (
"time"
)
+type controllerInterface interface {
+ Network(endpointIP string, labels labels.Instance) network.ID
+}
+
var (
+ _ controllerInterface = &Controller{}
_ serviceregistry.Instance = &Controller{}
)
@@ -58,8 +71,13 @@ type Controller struct {
configCluster bool
services kclient.Client[*v1.Service]
endpoints *endpointSliceController
- meshWatcher mesh.Watcher
- handlers model.ControllerHandlers
+ podsClient kclient.Client[*v1.Pod]
+ namespaces kclient.Client[*v1.Namespace]
+
+ meshWatcher mesh.Watcher
+ handlers model.ControllerHandlers
+ pods *PodCache
+ *networkManager
}
func NewController(kubeClient kubelib.Client, options Options) *Controller {
@@ -72,13 +90,75 @@ func NewController(kubeClient kubelib.Client, options
Options) *Controller {
configCluster: options.ConfigCluster,
}
+ c.networkManager = initNetworkManager(c, options)
+
+ c.namespaces = kclient.New[*v1.Namespace](kubeClient)
+ if c.opts.SystemNamespace != "" {
+ registerHandlers[*v1.Namespace](
+ c,
+ c.namespaces,
+ "Namespaces",
+ func(old *v1.Namespace, cur *v1.Namespace, event
model.Event) error {
+ if cur.Name == c.opts.SystemNamespace {
+ return c.onSystemNamespaceEvent(old,
cur, event)
+ }
+ return nil
+ },
+ nil,
+ )
+ }
+
c.services = kclient.NewFiltered[*v1.Service](kubeClient,
kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
registerHandlers(c, c.services, "Services", c.onServiceEvent, nil)
c.endpoints = newEndpointSliceController(c)
+
+ c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{
+ ObjectFilter: kubeClient.ObjectFilter(),
+ ObjectTransform: kubelib.StripPodUnusedFields,
+ })
+ c.pods = newPodCache(c, c.podsClient, func(key types.NamespacedName) {
+ c.queue.Push(func() error {
+ return c.endpoints.podArrived(key.Name, key.Namespace)
+ })
+ })
+ registerHandlers[*v1.Pod](c, c.podsClient, "Pods", c.pods.onEvent, nil)
c.meshWatcher = options.MeshWatcher
return c
}
+func (c *Controller) onSystemNamespaceEvent(_, ns *v1.Namespace, ev
model.Event) error {
+ if ev == model.EventDelete {
+ return nil
+ }
+ if c.setNetworkFromNamespace(ns) {
+ // network changed, rarely happen
+ // refresh pods/endpoints/services
+ c.onNetworkChange()
+ }
+ return nil
+}
+
+func (c *Controller) onNetworkChange() {
+ // the network for endpoints are computed when we process the events;
this will fix the cache
+ // NOTE: this must run before the other network watcher handler that
creates a force push
+ if err := c.syncPods(); err != nil {
+ klog.Errorf("one or more errors force-syncing pods: %v", err)
+ }
+ if err := c.endpoints.initializeNamespace(metav1.NamespaceAll, true);
err != nil {
+ klog.Errorf("one or more errors force-syncing endpoints: %v",
err)
+ }
+
+}
+
+func (c *Controller) syncPods() error {
+ var err *multierror.Error
+ pods := c.podsClient.List(metav1.NamespaceAll, klabels.Everything())
+ for _, s := range pods {
+ err = multierror.Append(err, c.pods.onEvent(nil, s,
model.EventAdd))
+ }
+ return err.ErrorOrNil()
+}
+
type Options struct {
KubernetesAPIQPS float32
KubernetesAPIBurst int
@@ -138,7 +218,7 @@ func (c *Controller) deleteService(svc *model.Service) {
shard := model.ShardKeyFromRegistry(c)
event := model.EventDelete
- c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname),
svc.Attributes.Namespace, event)
+ c.opts.XDSUpdater.ServiceUpdate(shard, string(svc.Hostname),
svc.Attributes.Namespace, event)
if !svc.Attributes.ExportTo.Contains(visibility.None) {
c.handlers.NotifyServiceHandlers(nil, svc, event)
}
@@ -155,13 +235,50 @@ func (c *Controller) addOrUpdateService(pre, curr
*v1.Service, currConv *model.S
shard := model.ShardKeyFromRegistry(c)
ns := currConv.Attributes.Namespace
- c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
+ c.opts.XDSUpdater.ServiceUpdate(shard, string(currConv.Hostname), ns,
event)
if serviceUpdateNeedsPush(pre, curr, prevConv, currConv) {
klog.V(2).Infof("Service %s in namespace %s updated and needs
push", currConv.Hostname, ns)
c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
}
}
+func (c *Controller) recomputeServiceForPod(pod *v1.Pod) {
+ allServices := c.services.List(pod.Namespace, klabels.Everything())
+ cu := sets.New[model.ConfigKey]()
+ services := getPodServices(allServices, pod)
+ for _, svc := range services {
+ hostname := kube.ServiceHostname(svc.Name, svc.Namespace,
c.opts.DomainSuffix)
+ c.Lock()
+ conv, f := c.servicesMap[hostname]
+ c.Unlock()
+ if !f {
+ return
+ }
+ shard := model.ShardKeyFromRegistry(c)
+ endpoints := c.buildEndpointsForService(conv, true)
+ if len(endpoints) > 0 {
+ c.opts.XDSUpdater.EDSCacheUpdate(shard,
string(hostname), svc.Namespace, endpoints)
+ }
+ cu.Insert(model.ConfigKey{
+ Kind: kind.ServiceEntry,
+ Name: string(hostname),
+ Namespace: svc.Namespace,
+ })
+ }
+ if len(cu) > 0 {
+ c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
+ Full: false,
+ ConfigsUpdated: cu,
+ Reason:
model.NewReasonStats(model.EndpointUpdate),
+ })
+ }
+}
+
+func (c *Controller) buildEndpointsForService(svc *model.Service, updateCache
bool) []*model.DubboEndpoint {
+ endpoints :=
c.endpoints.buildDubboEndpointsWithService(svc.Attributes.Name,
svc.Attributes.Namespace, svc.Hostname, updateCache)
+ return endpoints
+}
+
func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv
*model.Service) bool {
// New Service - If it is not exported, no need to push.
if preConv == nil {
@@ -223,7 +340,17 @@ func (c *Controller) HasSynced() bool {
}
func (c *Controller) informersSynced() bool {
- return true
+ return c.namespaces.HasSynced() &&
+ c.pods.pods.HasSynced() &&
+ c.services.HasSynced() &&
+ c.endpoints.slices.HasSynced() &&
+ c.networkManager.HasSynced()
+}
+
+func (c *Controller) hostNamesForNamespacedName(name types.NamespacedName)
[]host.Name {
+ return []host.Name{
+ kube.ServiceHostname(name.Name, name.Namespace,
c.opts.DomainSuffix),
+ }
}
type FilterOutFunc[T controllers.Object] func(old, cur T) bool
@@ -273,3 +400,20 @@ func (c *Controller) servicesForNamespacedName(name
types.NamespacedName) []*mod
}
return nil
}
+
+func (c *Controller) Network(endpointIP string, labels labels.Instance)
network.ID {
+ // 1. check the pod/workloadEntry label
+ if nw := labels[label.TopologyNetwork.Name]; nw != "" {
+ return network.ID(nw)
+ }
+ // 2. check the system namespace labels
+ if nw := c.networkFromSystemNamespace(); nw != "" {
+ return nw
+ }
+
+ // 3. check the meshNetworks config
+ if nw := c.networkFromMeshNetworks(endpointIP); nw != "" {
+ return nw
+ }
+ return ""
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/endpoint_builder.go
b/sail/pkg/serviceregistry/kube/controller/endpoint_builder.go
new file mode 100644
index 00000000..83b32eb0
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/endpoint_builder.go
@@ -0,0 +1,97 @@
+package controller
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ "github.com/apache/dubbo-kubernetes/pkg/network"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube"
+ labelutil
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/util/label"
+ v1 "k8s.io/api/core/v1"
+)
+
+type EndpointBuilder struct {
+ controller controllerInterface
+ labels labels.Instance
+ metaNetwork network.ID
+ serviceAccount string
+ workloadName string
+ namespace string
+ hostname string
+ subDomain string
+ nodeName string
+}
+
+func (c *Controller) NewEndpointBuilder(pod *v1.Pod) *EndpointBuilder {
+ var sa, namespace, hostname, subdomain, ip, node string
+ var podLabels labels.Instance
+ if pod != nil {
+ sa = kube.SecureNamingSAN(pod, c.meshWatcher.Mesh())
+ podLabels = pod.Labels
+ namespace = pod.Namespace
+ subdomain = pod.Spec.Subdomain
+ if subdomain != "" {
+ hostname = pod.Spec.Hostname
+ if hostname == "" {
+ hostname = pod.Name
+ }
+ }
+ node = pod.Spec.NodeName
+ }
+ out := &EndpointBuilder{
+ controller: c,
+ serviceAccount: sa,
+ namespace: namespace,
+ hostname: hostname,
+ subDomain: subdomain,
+ labels: podLabels,
+ nodeName: node,
+ }
+ networkID := out.endpointNetwork(ip)
+ out.labels = labelutil.AugmentLabels(podLabels, c.Cluster(), "", node,
networkID)
+ return out
+}
+
+func (b *EndpointBuilder) buildDubboEndpoint(
+ endpointAddress string,
+ endpointPort int32,
+ svcPortName string,
+ discoverabilityPolicy model.EndpointDiscoverabilityPolicy,
+ healthStatus model.HealthStatus,
+ sendUnhealthy bool,
+) *model.DubboEndpoint {
+ if b == nil {
+ return nil
+ }
+
+ // in case pod is not found when init EndpointBuilder.
+ networkID := network.ID(b.labels["topology.dubbo.io/network"])
+ if networkID == "" {
+ networkID = b.endpointNetwork(endpointAddress)
+ b.labels["topology.dubbo.io/network"] = string(networkID)
+ }
+
+ return &model.DubboEndpoint{
+ Labels: b.labels,
+ ServiceAccount: b.serviceAccount,
+ Addresses: []string{endpointAddress},
+ EndpointPort: uint32(endpointPort),
+ ServicePortName: svcPortName,
+ Network: networkID,
+ Namespace: b.namespace,
+ HostName: b.hostname,
+ SubDomain: b.subDomain,
+ DiscoverabilityPolicy: discoverabilityPolicy,
+ HealthStatus: healthStatus,
+ SendUnhealthyEndpoints: sendUnhealthy,
+ NodeName: b.nodeName,
+ }
+}
+
+func (b *EndpointBuilder) endpointNetwork(endpointIP string) network.ID {
+ // If we're building the endpoint based on proxy meta, prefer the
injected ISTIO_META_NETWORK value.
+ if b.metaNetwork != "" {
+ return b.metaNetwork
+ }
+
+ return b.controller.Network(endpointIP, b.labels)
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
index 3e240cc9..0af5fa38 100644
--- a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -2,18 +2,23 @@ package controller
import (
"github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/hashicorp/go-multierror"
"istio.io/api/annotation"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/discovery/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
+ "k8s.io/apimachinery/pkg/types"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
"strings"
+ "sync"
)
var (
@@ -22,15 +27,29 @@ var (
)
type endpointSliceController struct {
- slices kclient.Client[*v1.EndpointSlice]
- c *Controller
+ endpointCache *endpointSliceCache
+ slices kclient.Client[*v1.EndpointSlice]
+ c *Controller
+}
+
+type endpointSliceCache struct {
+ mu sync.RWMutex
+ endpointsByServiceAndSlice
map[host.Name]map[string][]*model.DubboEndpoint
+}
+
+func newEndpointSliceCache() *endpointSliceCache {
+ out := &endpointSliceCache{
+ endpointsByServiceAndSlice:
make(map[host.Name]map[string][]*model.DubboEndpoint),
+ }
+ return out
}
func newEndpointSliceController(c *Controller) *endpointSliceController {
slices := kclient.NewFiltered[*v1.EndpointSlice](c.client,
kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
out := &endpointSliceController{
- c: c,
- slices: slices,
+ c: c,
+ slices: slices,
+ endpointCache: newEndpointSliceCache(),
}
registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice",
out.onEvent, nil)
return out
@@ -46,15 +65,11 @@ func (esc *endpointSliceController) onEventInternal(_, ep
*v1.EndpointSlice, eve
if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
return
}
- // Update internal endpoint cache no matter what kind of service, even
headless service.
- // As for gateways, the cluster discovery type is `EDS` for headless
service.
- // namespacedName := getServiceNamespacedName(ep)
- // log.Debugf("Handle EDS endpoint %s %s in namespace %s",
namespacedName.Name, event, namespacedName.Namespace)
- // if event == model.EventDelete {
- // esc.deleteEndpointSlice(ep)
- // } else {
- // esc.updateEndpointSlice(ep)
- // }
+ if event == model.EventDelete {
+ esc.deleteEndpointSlice(ep)
+ } else {
+ esc.updateEndpointSlice(ep)
+ }
// Now check if we need to do a full push for the service.
// If the service is headless, we need to do a full push if service
exposes TCP ports
@@ -66,10 +81,9 @@ func (esc *endpointSliceController) onEventInternal(_, ep
*v1.EndpointSlice, eve
return
}
- // hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
- // log.Debugf("triggering EDS push for %s in namespace %s", hostnames,
namespacedName.Namespace)
- // Trigger EDS push for all hostnames.
- // esc.pushEDS(hostnames, namespacedName.Namespace)
+ namespacedName := getServiceNamespacedName(ep)
+ hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
+ esc.pushEDS(hostnames, namespacedName.Namespace)
if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone ||
svc.Spec.Type == corev1.ServiceTypeExternalName {
return
@@ -85,8 +99,6 @@ func (esc *endpointSliceController) onEventInternal(_, ep
*v1.EndpointSlice, eve
}
}
if supportsOnlyHTTP {
- // pure HTTP headless services should not need a full
push since they do not
- // require a Listener based on IP:
https://github.com/istio/istio/issues/48207
configsUpdated.Insert(model.ConfigKey{Kind:
kind.DNSName, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
} else {
configsUpdated.Insert(model.ConfigKey{Kind:
kind.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
@@ -102,10 +114,186 @@ func (esc *endpointSliceController) onEventInternal(_,
ep *v1.EndpointSlice, eve
}
}
+func (esc *endpointSliceController) deleteEndpointSlice(slice
*v1.EndpointSlice) {
+ key := config.NamespacedName(slice)
+ for _, e := range slice.Endpoints {
+ for _, a := range e.Addresses {
+ esc.c.pods.endpointDeleted(key, a)
+ }
+ }
+
+ esc.endpointCache.mu.Lock()
+ defer esc.endpointCache.mu.Unlock()
+ for _, hostName := range
esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
+ // endpointSlice cache update
+ if esc.endpointCache.has(hostName) {
+ esc.endpointCache.delete(hostName, slice.Name)
+ }
+ }
+}
+
+func (e *endpointSliceCache) has(hostname host.Name) bool {
+ _, found := e.endpointsByServiceAndSlice[hostname]
+ return found
+}
+
+func (e *endpointSliceCache) delete(hostname host.Name, slice string) {
+ delete(e.endpointsByServiceAndSlice[hostname], slice)
+ if len(e.endpointsByServiceAndSlice[hostname]) == 0 {
+ delete(e.endpointsByServiceAndSlice, hostname)
+ }
+}
+
+func (esc *endpointSliceController) updateEndpointSlice(slice
*v1.EndpointSlice) {
+ for _, hostname := range
esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
+ esc.updateEndpointCacheForSlice(hostname, slice)
+ }
+}
+
+func (esc *endpointSliceController) initializeNamespace(ns string, filtered
bool) error {
+ var err *multierror.Error
+ var endpoints []*v1.EndpointSlice
+ if filtered {
+ endpoints = esc.slices.List(ns, klabels.Everything())
+ } else {
+ endpoints = esc.slices.ListUnfiltered(ns, klabels.Everything())
+ }
+ for _, s := range endpoints {
+ err = multierror.Append(err, esc.onEvent(nil, s,
model.EventAdd))
+ }
+ return err.ErrorOrNil()
+}
+
+func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName
host.Name, epSlice *v1.EndpointSlice) {
+ var endpoints []*model.DubboEndpoint
+ if epSlice.AddressType == v1.AddressTypeFQDN {
+ return
+ }
+ svc := esc.c.GetService(hostName)
+ svcNamespacedName := getServiceNamespacedName(epSlice)
+ // This is not a endpointslice for service, ignore
+ if svcNamespacedName.Name == "" {
+ return
+ }
+
+ for _, e := range epSlice.Endpoints {
+ // Draining tracking is only enabled if persistent sessions is
enabled.
+ // If we start using them for other features, this can be
adjusted.
+ healthStatus := endpointHealthStatus(svc, e)
+ for _, a := range e.Addresses {
+ pod, expectedPod := getPod(esc.c, a,
&metav1.ObjectMeta{Name: epSlice.Name, Namespace: epSlice.Namespace},
e.TargetRef, hostName)
+ if pod == nil && expectedPod {
+ continue
+ }
+
+ var overrideAddresses []string
+ builder := esc.c.NewEndpointBuilder(pod)
+ // EDS and ServiceEntry use name for service port - ADS
will need to map to numbers.
+ for _, port := range epSlice.Ports {
+ var portNum int32
+ if port.Port != nil {
+ portNum = *port.Port
+ }
+ var portName string
+ if port.Name != nil {
+ portName = *port.Name
+ }
+
+ dubboEndpoint := builder.buildDubboEndpoint(a,
portNum, portName, nil, healthStatus, svc.SupportsUnhealthyEndpoints())
+ if len(overrideAddresses) > 1 {
+ dubboEndpoint.Addresses =
overrideAddresses
+ }
+ endpoints = append(endpoints, dubboEndpoint)
+ }
+ }
+ }
+ esc.endpointCache.Update(hostName, epSlice.Name, endpoints)
+}
+
+func (e *endpointSliceCache) Update(hostname host.Name, slice string,
endpoints []*model.DubboEndpoint) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.update(hostname, slice, endpoints)
+}
+
+func (e *endpointSliceCache) update(hostname host.Name, slice string,
endpoints []*model.DubboEndpoint) {
+ if len(endpoints) == 0 {
+ delete(e.endpointsByServiceAndSlice[hostname], slice)
+ }
+ if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
+ e.endpointsByServiceAndSlice[hostname] =
make(map[string][]*model.DubboEndpoint)
+ }
+ // We will always overwrite. A conflict here means an endpoint is
transitioning
+ // from one slice to another See
+ //
https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints
+ // In this case, we can always assume and update is fresh, although
older slices
+ // we have not gotten updates may be stale; therefore we always take
the new
+ // update.
+ e.endpointsByServiceAndSlice[hostname][slice] = endpoints
+}
+
+func endpointHealthStatus(svc *model.Service, e v1.Endpoint)
model.HealthStatus {
+ if e.Conditions.Ready == nil || *e.Conditions.Ready {
+ return model.Healthy
+ }
+
+ // If it is shutting down, mark it as terminating. This occurs
regardless of whether it was previously healthy or not.
+ if svc != nil &&
+ (e.Conditions.Terminating == nil || *e.Conditions.Terminating) {
+ return model.Terminating
+ }
+
+ return model.UnHealthy
+}
+
func serviceNameForEndpointSlice(labels map[string]string) string {
return labels[v1.LabelServiceName]
}
+func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef
*corev1.ObjectReference, host host.Name) (*corev1.Pod, bool) {
+ var expectPod bool
+ pod := c.getPod(ip, ep.Namespace, targetRef)
+ if targetRef != nil && targetRef.Kind == kind.Pod.String() {
+ expectPod = true
+ if pod == nil {
+ c.registerEndpointResync(ep, ip, host)
+ }
+ }
+
+ return pod, expectPod
+}
+
+func (c *Controller) registerEndpointResync(ep *metav1.ObjectMeta, ip string,
host host.Name) {
+ // Tell pod cache we want to queue the endpoint event when this pod
arrives.
+ c.pods.queueEndpointEventOnPodArrival(config.NamespacedName(ep), ip)
+}
+
+func (c *Controller) getPod(ip string, namespace string, targetRef
*corev1.ObjectReference) *corev1.Pod {
+ if targetRef != nil && targetRef.Kind == kind.Pod.String() {
+ key := types.NamespacedName{Name: targetRef.Name, Namespace:
targetRef.Namespace}
+ pod := c.pods.getPodByKey(key)
+ return pod
+ }
+ // This means the endpoint is manually controlled
+ // We will want to lookup a pod to find metadata like service account,
labels, etc. But for hostNetwork, we just get a raw IP,
+ // and the IP may be shared by many pods. Best we can do is guess.
+ pods := c.pods.getPodsByIP(ip)
+ for _, p := range pods {
+ if p.Namespace == namespace {
+ // Might not be right, but best we can do.
+ return p
+ }
+ }
+ return nil
+}
+
+func getServiceNamespacedName(slice *v1.EndpointSlice) types.NamespacedName {
+ return types.NamespacedName{
+ Namespace: slice.GetNamespace(),
+ Name: serviceNameForEndpointSlice(slice.GetLabels()),
+ }
+}
+
func serviceNeedsPush(svc *corev1.Service) bool {
if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
namespaces :=
strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
@@ -118,3 +306,73 @@ func serviceNeedsPush(svc *corev1.Service) bool {
}
return true
}
+
+func (esc *endpointSliceController) pushEDS(hostnames []host.Name, namespace
string) {
+ shard := model.ShardKeyFromRegistry(esc.c)
+ esc.endpointCache.mu.Lock()
+ defer esc.endpointCache.mu.Unlock()
+
+ for _, hostname := range hostnames {
+ endpoints := esc.endpointCache.get(hostname)
+ esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname),
namespace, endpoints)
+ }
+}
+
+type endpointKey struct {
+ ip string
+ port string
+}
+
+func (e *endpointSliceCache) get(hostname host.Name) []*model.DubboEndpoint {
+ var endpoints []*model.DubboEndpoint
+ found := sets.New[endpointKey]()
+ for _, eps := range e.endpointsByServiceAndSlice[hostname] {
+ for _, ep := range eps {
+ key := endpointKey{ep.FirstAddressOrNil(),
ep.ServicePortName}
+ if found.InsertContains(key) {
+ // This a duplicate. Update() already handles
conflict resolution, so we don't
+ // need to pick the "right" one here.
+ continue
+ }
+ endpoints = append(endpoints, ep)
+ }
+ }
+ return endpoints
+}
+
+func (esc *endpointSliceController) podArrived(name, ns string) error {
+ ep := esc.slices.Get(name, ns)
+ if ep == nil {
+ return nil
+ }
+ return esc.onEvent(nil, ep, model.EventAdd)
+}
+
+func (esc *endpointSliceController) buildDubboEndpointsWithService(name,
namespace string, hostName host.Name, updateCache bool) []*model.DubboEndpoint {
+ esLabelSelector := endpointSliceSelectorForService(name)
+ slices := esc.slices.List(namespace, esLabelSelector)
+ if len(slices) == 0 {
+ return nil
+ }
+
+ if updateCache {
+ // A cache update was requested. Rebuild the endpoints for
these slices.
+ for _, slice := range slices {
+ esc.updateEndpointCacheForSlice(hostName, slice)
+ }
+ }
+
+ return esc.endpointCache.Get(hostName)
+}
+
+func (e *endpointSliceCache) Get(hostname host.Name) []*model.DubboEndpoint {
+ e.mu.RLock()
+ defer e.mu.RUnlock()
+ return e.get(hostname)
+}
+
+func endpointSliceSelectorForService(name string) klabels.Selector {
+ return klabels.Set(map[string]string{
+ v1.LabelServiceName: name,
+ }).AsSelectorPreValidated().Add(*endpointSliceRequirement)
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/multicluster.go
b/sail/pkg/serviceregistry/kube/controller/multicluster.go
index cad7a102..06c956a1 100644
--- a/sail/pkg/serviceregistry/kube/controller/multicluster.go
+++ b/sail/pkg/serviceregistry/kube/controller/multicluster.go
@@ -7,15 +7,13 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/server"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
-
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
"k8s.io/klog/v2"
)
type kubeController struct {
MeshServiceController *aggregate.Controller
*Controller
- workloadEntryController *serviceentry.Controller
- stop chan struct{}
+ stop chan struct{}
}
func (k *kubeController) Close() {
diff --git a/sail/pkg/serviceregistry/kube/controller/network.go
b/sail/pkg/serviceregistry/kube/controller/network.go
new file mode 100644
index 00000000..32a67988
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/network.go
@@ -0,0 +1,56 @@
+package controller
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/network"
+ v1 "k8s.io/api/core/v1"
+ "sync"
+)
+
+type networkManager struct {
+ sync.RWMutex
+ clusterID cluster.ID
+ meshNetworksWatcher mesh.NetworksWatcher
+ network network.ID
+ networkFromMeshConfig network.ID
+}
+
+func initNetworkManager(c *Controller, options Options) *networkManager {
+ n := &networkManager{
+ clusterID: options.ClusterID,
+ meshNetworksWatcher: options.MeshNetworksWatcher,
+ network: "",
+ networkFromMeshConfig: "",
+ }
+ return n
+}
+
+func (n *networkManager) networkFromSystemNamespace() network.ID {
+ n.RLock()
+ defer n.RUnlock()
+ return n.network
+}
+
+func (n *networkManager) networkFromMeshNetworks(endpointIP string) network.ID
{
+ n.RLock()
+ defer n.RUnlock()
+ if n.networkFromMeshConfig != "" {
+ return n.networkFromMeshConfig
+ }
+
+ return ""
+}
+
+func (n *networkManager) HasSynced() bool {
+ return true
+}
+
+func (n *networkManager) setNetworkFromNamespace(ns *v1.Namespace) bool {
+ nw := ns.Labels["topology.dubbo.io/network"]
+ n.Lock()
+ defer n.Unlock()
+ oldDefaultNetwork := n.network
+ n.network = network.ID(nw)
+ return oldDefaultNetwork != n.network
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/pod.go
b/sail/pkg/serviceregistry/kube/controller/pod.go
new file mode 100644
index 00000000..bef04660
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/pod.go
@@ -0,0 +1,252 @@
+package controller
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "istio.io/api/annotation"
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "sync"
+)
+
+type PodCache struct {
+ pods kclient.Client[*v1.Pod]
+
+ sync.RWMutex
+ // podsByIP maintains stable pod IP to name key mapping
+ // this allows us to retrieve the latest status by pod IP.
+ // This should only contain RUNNING or PENDING pods with an allocated
IP.
+ podsByIP map[string]sets.Set[types.NamespacedName]
+ // ipByPods is a reverse map of podsByIP. This exists to allow us to
prune stale entries in the
+ // pod cache if a pod changes IP.
+ ipByPods map[types.NamespacedName]string
+
+ // needResync is map of IP to endpoint namespace/name. This is used to
requeue endpoint
+ // events when pod event comes. This typically happens when pod is not
available
+ // in podCache when endpoint event comes.
+ needResync map[string]sets.Set[types.NamespacedName]
+ queueEndpointEvent func(types.NamespacedName)
+
+ c *Controller
+}
+
+func newPodCache(c *Controller, pods kclient.Client[*v1.Pod],
queueEndpointEvent func(types.NamespacedName)) *PodCache {
+ out := &PodCache{
+ pods: pods,
+ c: c,
+ podsByIP:
make(map[string]sets.Set[types.NamespacedName]),
+ ipByPods: make(map[types.NamespacedName]string),
+ needResync:
make(map[string]sets.Set[types.NamespacedName]),
+ queueEndpointEvent: queueEndpointEvent,
+ }
+
+ return out
+}
+
+func (pc *PodCache) endpointDeleted(key types.NamespacedName, ip string) {
+ pc.Lock()
+ defer pc.Unlock()
+ sets.DeleteCleanupLast(pc.needResync, ip, key)
+}
+
+func (pc *PodCache) getPodByKey(key types.NamespacedName) *v1.Pod {
+ return pc.pods.Get(key.Name, key.Namespace)
+}
+
+func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName {
+ pc.RLock()
+ defer pc.RUnlock()
+ return pc.podsByIP[addr].UnsortedList()
+}
+
+func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod {
+ keys := pc.getPodKeys(addr)
+ if keys == nil {
+ return nil
+ }
+ res := make([]*v1.Pod, 0, len(keys))
+ for _, key := range keys {
+ p := pc.getPodByKey(key)
+ // Subtle race condition. getPodKeys is our cache over pods,
while getPodByKey hits the informer cache.
+ // if these are out of sync, p may be nil (pod was deleted).
+ if p != nil {
+ res = append(res, p)
+ }
+ }
+ return res
+}
+
+func (pc *PodCache) queueEndpointEventOnPodArrival(key types.NamespacedName,
ip string) {
+ pc.Lock()
+ defer pc.Unlock()
+ sets.InsertOrNew(pc.needResync, ip, key)
+}
+
+func (pc *PodCache) getIPByPod(key types.NamespacedName) string {
+ pc.RLock()
+ defer pc.RUnlock()
+ return pc.ipByPods[key]
+}
+
+func (pc *PodCache) onEvent(old, pod *v1.Pod, ev model.Event) error {
+ ip := pod.Status.PodIP
+ // PodIP will be empty when pod is just created, but before the IP is
assigned
+ // via UpdateStatus.
+ if len(ip) == 0 {
+ // However, in the case of an Eviction, the event that marks
the pod as Failed may *also* have removed the IP.
+ // If the pod *used to* have an IP, then we need to actually
delete it.
+ ip = pc.getIPByPod(config.NamespacedName(pod))
+ if len(ip) == 0 {
+ return nil
+ }
+ }
+
+ key := config.NamespacedName(pod)
+ switch ev {
+ case model.EventAdd:
+ if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
+ pc.addPod(pod, ip, key, false)
+ } else {
+ return nil
+ }
+ case model.EventUpdate:
+ if !shouldPodBeInEndpoints(pod) || !IsPodReady(pod) {
+ // delete only if this pod was in the cache
+ if !pc.deleteIP(ip, key) {
+ return nil
+ }
+ ev = model.EventDelete
+ } else if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
+ labelUpdated := pc.labelFilter(old, pod)
+ pc.addPod(pod, ip, key, labelUpdated)
+ } else {
+ return nil
+ }
+ case model.EventDelete:
+ // delete only if this pod was in the cache,
+ // in most case it has already been deleted in `UPDATE` with
`DeletionTimestamp` set.
+ if !pc.deleteIP(ip, key) {
+ return nil
+ }
+ }
+ // TODO notifyWorkloadHandlers
+ return nil
+}
+
+func (pc *PodCache) addPod(pod *v1.Pod, ip string, key types.NamespacedName,
labelUpdated bool) {
+ pc.Lock()
+ // if the pod has been cached, return
+ if pc.podsByIP[ip].Contains(key) {
+ pc.Unlock()
+ if labelUpdated {
+ pc.proxyUpdates(pod, true)
+ }
+ return
+ }
+ if current, f := pc.ipByPods[key]; f {
+ // The pod already exists, but with another IP Address. We need
to clean up that
+ sets.DeleteCleanupLast(pc.podsByIP, current, key)
+ }
+ sets.InsertOrNew(pc.podsByIP, ip, key)
+ pc.ipByPods[key] = ip
+
+ if endpointsToUpdate, f := pc.needResync[ip]; f {
+ delete(pc.needResync, ip)
+ for epKey := range endpointsToUpdate {
+ pc.queueEndpointEvent(epKey)
+ }
+ }
+ pc.Unlock()
+
+ pc.proxyUpdates(pod, false)
+}
+
+func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
+ pc.Lock()
+ defer pc.Unlock()
+ if pc.podsByIP[ip].Contains(podKey) {
+ sets.DeleteCleanupLast(pc.podsByIP, ip, podKey)
+ delete(pc.ipByPods, podKey)
+ return true
+ }
+ return false
+}
+
+func (pc *PodCache) labelFilter(old, cur *v1.Pod) bool {
+ // If labels/annotations updated, trigger proxy push
+ labelsChanged := !maps.Equal(old.Labels, cur.Labels)
+ // Annotations are only used in endpoints in one case, so just compare
that one
+ relevantAnnotationsChanged :=
old.Annotations[annotation.AmbientRedirection.Name] !=
cur.Annotations[annotation.AmbientRedirection.Name]
+ changed := labelsChanged || relevantAnnotationsChanged
+ return changed
+}
+
+func (pc *PodCache) proxyUpdates(pod *v1.Pod, isPodUpdate bool) {
+ if pc.c != nil {
+ if pc.c.opts.XDSUpdater != nil {
+ ip := pod.Status.PodIP
+ pc.c.opts.XDSUpdater.ProxyUpdate(pc.c.Cluster(), ip)
+ }
+ if isPodUpdate {
+ // Recompute service(s) due to pod label change.
+ // If it is a new pod, no need to recompute, as it yet
computed for the first time yet.
+ pc.c.recomputeServiceForPod(pod)
+ }
+ }
+}
+
+func shouldPodBeInEndpoints(pod *v1.Pod) bool {
+ if isPodPhaseTerminal(pod.Status.Phase) {
+ return false
+ }
+
+ if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
+ return false
+ }
+
+ if pod.DeletionTimestamp != nil {
+ return false
+ }
+
+ return true
+}
+
+func isPodPhaseTerminal(phase v1.PodPhase) bool {
+ return phase == v1.PodFailed || phase == v1.PodSucceeded
+}
+
+func IsPodReady(pod *v1.Pod) bool {
+ return IsPodReadyConditionTrue(pod.Status)
+}
+
+func IsPodReadyConditionTrue(status v1.PodStatus) bool {
+ condition := GetPodReadyCondition(status)
+ return condition != nil && condition.Status == v1.ConditionTrue
+}
+
+func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
+ _, condition := GetPodCondition(&status, v1.PodReady)
+ return condition
+}
+
+func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType)
(int, *v1.PodCondition) {
+ if status == nil {
+ return -1, nil
+ }
+ return GetPodConditionFromList(status.Conditions, conditionType)
+}
+
+func GetPodConditionFromList(conditions []v1.PodCondition, conditionType
v1.PodConditionType) (int, *v1.PodCondition) {
+ if conditions == nil {
+ return -1, nil
+ }
+ for i := range conditions {
+ if conditions[i].Type == conditionType {
+ return i, &conditions[i]
+ }
+ }
+ return -1, nil
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/util.go
b/sail/pkg/serviceregistry/kube/controller/util.go
index f8c5ecaf..cc9db054 100644
--- a/sail/pkg/serviceregistry/kube/controller/util.go
+++ b/sail/pkg/serviceregistry/kube/controller/util.go
@@ -2,6 +2,8 @@ package controller
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ v1 "k8s.io/api/core/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/validation/field"
@@ -14,3 +16,14 @@ func labelRequirement(key string, op selection.Operator,
vals []string, opts ...
}
return out
}
+
+func getPodServices(allServices []*v1.Service, pod *v1.Pod) []*v1.Service {
+ var services []*v1.Service
+ for _, service := range allServices {
+ if labels.Instance(service.Spec.Selector).Match(pod.Labels) {
+ services = append(services, service)
+ }
+ }
+
+ return services
+}
diff --git a/sail/pkg/serviceregistry/kube/conversion.go
b/sail/pkg/serviceregistry/kube/conversion.go
index 1d616bd1..68072953 100644
--- a/sail/pkg/serviceregistry/kube/conversion.go
+++ b/sail/pkg/serviceregistry/kube/conversion.go
@@ -150,3 +150,7 @@ func convertPort(port corev1.ServicePort) *model.Port {
func kubeToDubboServiceAccount(saname string, ns string, mesh
*meshconfig.MeshConfig) string {
return spiffe.MustGenSpiffeURI(mesh, ns, saname)
}
+
+func SecureNamingSAN(pod *corev1.Pod, mesh *meshconfig.MeshConfig) string {
+ return spiffe.MustGenSpiffeURI(mesh, pod.Namespace,
pod.Spec.ServiceAccountName)
+}
diff --git a/sail/pkg/serviceregistry/serviceentry/controller.go
b/sail/pkg/serviceregistry/serviceentry/controller.go
deleted file mode 100644
index 5d116e7c..00000000
--- a/sail/pkg/serviceregistry/serviceentry/controller.go
+++ /dev/null
@@ -1,4 +0,0 @@
-package serviceentry
-
-type Controller struct {
-}
diff --git a/sail/pkg/serviceregistry/util/label/label.go
b/sail/pkg/serviceregistry/util/label/label.go
new file mode 100644
index 00000000..3eaecc5b
--- /dev/null
+++ b/sail/pkg/serviceregistry/util/label/label.go
@@ -0,0 +1,60 @@
+package label
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ "github.com/apache/dubbo-kubernetes/pkg/network"
+ "istio.io/api/label"
+ "strings"
+)
+
+const (
+ LabelHostname = "kubernetes.io/hostname"
+
+ LabelTopologyZone = "topology.kubernetes.io/zone"
+ LabelTopologySubzone = "topology.dubbo.io/subzone"
+ LabelTopologyRegion = "topology.kubernetes.io/region"
+)
+
+func AugmentLabels(in labels.Instance, clusterID cluster.ID, locality, k8sNode
string, networkID network.ID) labels.Instance {
+ // Copy the original labels to a new map.
+ out := make(labels.Instance, len(in)+6)
+ for k, v := range in {
+ out[k] = v
+ }
+
+ region, zone, subzone := SplitLocalityLabel(locality)
+ if len(region) > 0 {
+ out[LabelTopologyRegion] = region
+ }
+ if len(zone) > 0 {
+ out[LabelTopologyZone] = zone
+ }
+ if len(subzone) > 0 {
+ out[label.TopologySubzone.Name] = subzone
+ }
+ if len(clusterID) > 0 {
+ out[label.TopologyCluster.Name] = clusterID.String()
+ }
+ if len(k8sNode) > 0 {
+ out[LabelHostname] = k8sNode
+ }
+ // In c.Network(), we already set the network label in priority order
pod labels > namespace label > mesh Network
+ // We won't let proxy.Metadata.Network override the above.
+ if len(networkID) > 0 && out[label.TopologyNetwork.Name] == "" {
+ out[label.TopologyNetwork.Name] = networkID.String()
+ }
+ return out
+}
+
+func SplitLocalityLabel(locality string) (region, zone, subzone string) {
+ items := strings.Split(locality, "/")
+ switch len(items) {
+ case 1:
+ return items[0], "", ""
+ case 2:
+ return items[0], items[1], ""
+ default:
+ return items[0], items[1], items[2]
+ }
+}
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 9d287319..f5bbd7d1 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -105,11 +105,8 @@ func (s *DiscoveryServer) adsClientCount() int {
return len(s.adsClients)
}
-func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy,
error) {
- return nil, nil
-}
-
func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection,
identities []string) error {
+
return nil
}
@@ -129,6 +126,7 @@ func (s *DiscoveryServer) Stream(stream DiscoveryStream)
error {
if peerInfo, ok := peer.FromContext(ctx); ok {
peerAddr = peerInfo.Addr.String()
}
+
// TODO WaitForRequestLimit?
if err := s.WaitForRequestLimit(stream.Context()); err != nil {
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 6ca0ea4a..636bedf7 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -164,6 +164,46 @@ func (s *DiscoveryServer) IsServerReady() bool {
return s.serverReady.Load()
}
+func (s *DiscoveryServer) ProxyUpdate(clusterID cluster.ID, ip string) {
+ var connection *Connection
+
+ for _, v := range s.Clients() {
+ if v.proxy.Metadata.ClusterID == clusterID &&
v.proxy.IPAddresses[0] == ip {
+ connection = v
+ break
+ }
+ }
+
+ // It is possible that the envoy has not connected to this pilot, maybe
connected to another pilot
+ if connection == nil {
+ return
+ }
+
+ s.pushQueue.Enqueue(connection, &model.PushRequest{
+ Full: true,
+ Push: s.globalPushContext(),
+ Start: time.Now(),
+ Reason: model.NewReasonStats(model.ProxyUpdate),
+ Forced: true,
+ })
+}
+
+func (s *DiscoveryServer) Clients() []*Connection {
+ s.adsClientsMutex.RLock()
+ defer s.adsClientsMutex.RUnlock()
+ clients := make([]*Connection, 0, len(s.adsClients))
+ for _, con := range s.adsClients {
+ select {
+ case <-con.InitializedCh():
+ default:
+ // Initialization not complete, skip
+ continue
+ }
+ clients = append(clients, con)
+ }
+ return clients
+}
+
func reasonsUpdated(req *model.PushRequest) string {
var (
reason0, reason1 model.TriggerReason
diff --git a/sail/pkg/xds/eds.go b/sail/pkg/xds/eds.go
index d2afda10..b35b3630 100644
--- a/sail/pkg/xds/eds.go
+++ b/sail/pkg/xds/eds.go
@@ -1,8 +1,12 @@
package xds
-import "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+)
-func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey, hostname string,
namespace string, event model.Event) {
+func (s *DiscoveryServer) ServiceUpdate(shard model.ShardKey, hostname string,
namespace string, event model.Event) {
// When a service deleted, we should cleanup the endpoint shards and
also remove keys from EndpointIndex to
// prevent memory leaks.
if event == model.EventDelete {
@@ -10,3 +14,25 @@ func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey,
hostname string, names
} else {
}
}
+
+func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string,
namespace string,
+ dubboEndpoints []*model.DubboEndpoint,
+) {
+ // Update the endpoint shards
+ pushType := s.Env.EndpointIndex.UpdateServiceEndpoints(shard,
serviceName, namespace, dubboEndpoints, true)
+ if pushType == model.IncrementalPush || pushType == model.FullPush {
+ // Trigger a push
+ s.ConfigUpdate(&model.PushRequest{
+ Full: pushType == model.FullPush,
+ ConfigsUpdated: sets.New(model.ConfigKey{Kind:
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
+ Reason:
model.NewReasonStats(model.EndpointUpdate),
+ })
+ }
+}
+
+func (s *DiscoveryServer) EDSCacheUpdate(shard model.ShardKey, serviceName
string, namespace string,
+ istioEndpoints []*model.DubboEndpoint,
+) {
+ // Update the endpoint shards
+ s.Env.EndpointIndex.UpdateServiceEndpoints(shard, serviceName,
namespace, istioEndpoints, false)
+}
diff --git a/sail/pkg/xds/xdsgen.go b/sail/pkg/xds/xdsgen.go
index b3c7986c..cc2a0598 100644
--- a/sail/pkg/xds/xdsgen.go
+++ b/sail/pkg/xds/xdsgen.go
@@ -31,6 +31,7 @@ func (s *DiscoveryServer) pushXds(con *Connection, w
*model.WatchedResource, req
ResourceNames: req.Delta.Subscribed,
}
}
+
res, logdata, err := gen.Generate(con.proxy, w, req)
info := ""
if len(logdata.AdditionalInfo) > 0 {