This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 44a04886 [discovery] xds get all k8s service object (#802)
44a04886 is described below
commit 44a04886389927ece20554edc60c3856dfcfa08b
Author: Jian Zhong <[email protected]>
AuthorDate: Mon Oct 13 13:58:46 2025 +0800
[discovery] xds get all k8s service object (#802)
---
pkg/config/constants/constants.go | 2 +
pkg/config/kube/conversion.go | 67 ++++++
pkg/config/schema/collections/collections.go | 45 ++++
pkg/config/schema/gvk/resources.go | 12 +
pkg/config/schema/gvr/resources.go | 6 +
pkg/config/schema/kind/resources.go | 3 +
pkg/config/schema/kubeclient/resources.go | 19 ++
pkg/config/schema/kubetypes/resources.go | 7 +
pkg/kube/multicluster/cluster.go | 23 ++
pkg/kube/multicluster/component.go | 65 +++++
pkg/kube/multicluster/secretcontroller.go | 95 ++++++++
sail/pkg/bootstrap/server.go | 47 +++-
sail/pkg/bootstrap/servicecontroller.go | 11 +-
sail/pkg/config/kube/crdclient/types.go | 19 ++
sail/pkg/model/addressmap.go | 17 ++
sail/pkg/model/controller.go | 22 ++
sail/pkg/model/endpointshards.go | 47 ++++
sail/pkg/model/push_context.go | 261 ++++++++++++++++++++-
sail/pkg/model/service.go | 178 +++++++++++++-
.../serviceregistry/kube/controller/controller.go | 152 ++++++++++++
.../kube/controller/endpointslice.go | 120 ++++++++++
.../kube/controller/multicluster.go | 94 ++++++++
sail/pkg/serviceregistry/kube/controller/util.go | 16 ++
sail/pkg/serviceregistry/kube/conversion.go | 152 ++++++++++++
.../pkg/serviceregistry/serviceentry/controller.go | 4 +
sail/pkg/xds/eds.go | 25 +-
26 files changed, 1467 insertions(+), 42 deletions(-)
diff --git a/pkg/config/constants/constants.go
b/pkg/config/constants/constants.go
index 4c779b73..4bcfa552 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -18,6 +18,8 @@
package constants
const (
+ UnspecifiedIP = "0.0.0.0"
+
DubboSystemNamespace = "dubbo-system"
DefaultClusterLocalDomain = "cluster.local"
DefaultClusterName = "Kubernetes"
diff --git a/pkg/config/kube/conversion.go b/pkg/config/kube/conversion.go
new file mode 100644
index 00000000..78918c14
--- /dev/null
+++ b/pkg/config/kube/conversion.go
@@ -0,0 +1,67 @@
+package kube
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/protocol"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ corev1 "k8s.io/api/core/v1"
+ "strings"
+)
+
+var (
+ grpcWeb = string(protocol.GRPCWeb)
+ grpcWebLen = len(grpcWeb)
+)
+
+const (
+ DNS = 53
+)
+
+var wellKnownPorts = sets.New[int32](DNS)
+
+func ConvertProtocol(port int32, portName string, proto corev1.Protocol,
appProto *string) protocol.Instance {
+ if proto == corev1.ProtocolUDP {
+ return protocol.UDP
+ }
+
+ // If application protocol is set, we will use that
+ // If not, use the port name
+ name := portName
+ if appProto != nil {
+ name = *appProto
+ // Kubernetes has a few AppProtocol specific standard names
defined in the Service spec
+ // Handle these only for AppProtocol (name cannot have these
values, anyways).
+ //
https://github.com/kubernetes/kubernetes/blob/b4140391cf39ea54cd0227e294283bfb5718c22d/staging/src/k8s.io/api/core/v1/generated.proto#L1245-L1248
+ switch name {
+ // "http2 over cleartext", which is also what our HTTP2 port is
+ case "kubernetes.io/h2c":
+ return protocol.HTTP2
+ // WebSocket over cleartext
+ case "kubernetes.io/ws":
+ return protocol.HTTP
+ // WebSocket over TLS
+ case "kubernetes.io/wss":
+ return protocol.HTTPS
+ }
+ }
+
+ // Check if the port name prefix is "grpc-web". Need to do this before
the general
+ // prefix check below, since it contains a hyphen.
+ if len(name) >= grpcWebLen && strings.EqualFold(name[:grpcWebLen],
grpcWeb) {
+ return protocol.GRPCWeb
+ }
+
+ // Parse the port name to find the prefix, if any.
+ i := strings.IndexByte(name, '-')
+ if i >= 0 {
+ name = name[:i]
+ }
+
+ p := protocol.Parse(name)
+ if p == protocol.Unsupported {
+ // Make TCP as default protocol for well know ports if protocol
is not specified.
+ if wellKnownPorts.Contains(port) {
+ return protocol.TCP
+ }
+ }
+ return p
+}
diff --git a/pkg/config/schema/collections/collections.go
b/pkg/config/schema/collections/collections.go
index 457283ad..e94dc2d4 100644
--- a/pkg/config/schema/collections/collections.go
+++ b/pkg/config/schema/collections/collections.go
@@ -9,6 +9,8 @@ import (
istioioapinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
istioioapisecurityv1beta1 "istio.io/api/security/v1beta1"
k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
+ k8sioapicorev1 "k8s.io/api/core/v1"
+ k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
"reflect"
)
@@ -105,12 +107,55 @@ var (
Synthetic: false,
Builtin: true,
}.MustBuild()
+ EndpointSlice = collection.Builder{
+ Identifier: "EndpointSlice",
+ Group: "discovery.k8s.io",
+ Kind: "EndpointSlice",
+ Plural: "endpointslices",
+ Version: "v1",
+ Proto: "k8s.io.api.discovery.v1.EndpointSlice",
+ ReflectType:
reflect.TypeOf(&k8sioapidiscoveryv1.EndpointSlice{}).Elem(),
+ ProtoPackage: "k8s.io/api/discovery/v1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: true,
+ }.MustBuild()
+
+ Endpoints = collection.Builder{
+ Identifier: "Endpoints",
+ Group: "",
+ Kind: "Endpoints",
+ Plural: "endpoints",
+ Version: "v1",
+ Proto: "k8s.io.api.core.v1.Endpoints",
+ ReflectType:
reflect.TypeOf(&k8sioapicorev1.Endpoints{}).Elem(),
+ ProtoPackage: "k8s.io/api/core/v1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: true,
+ }.MustBuild()
+ Service = collection.Builder{
+ Identifier: "Service",
+ Group: "",
+ Kind: "Service",
+ Plural: "services",
+ Version: "v1",
+ Proto: "k8s.io.api.core.v1.ServiceSpec", StatusProto:
"k8s.io.api.core.v1.ServiceStatus",
+ ReflectType:
reflect.TypeOf(&k8sioapicorev1.ServiceSpec{}).Elem(), StatusType:
reflect.TypeOf(&k8sioapicorev1.ServiceStatus{}).Elem(),
+ ProtoPackage: "k8s.io/api/core/v1", StatusPackage:
"k8s.io/api/core/v1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: true,
+ }.MustBuild()
All = collection.NewSchemasBuilder().
MustAdd(PeerAuthentication).
MustAdd(RequestAuthentication).
MustAdd(DestinationRule).
MustAdd(VirtualService).
+ MustAdd(EndpointSlice).
+ MustAdd(Endpoints).
+ MustAdd(Service).
MustAdd(MutatingWebhookConfiguration).
MustAdd(ValidatingWebhookConfiguration).
Build()
diff --git a/pkg/config/schema/gvk/resources.go
b/pkg/config/schema/gvk/resources.go
index 8fb566d9..36dc2a91 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -42,6 +42,8 @@ var (
AuthorizationPolicy = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "AuthorizationPolicy"}
DestinationRule = config.GroupVersionKind{Group:
"networking.dubbo.io", Version: "v1", Kind: "DestinationRule"}
VirtualService = config.GroupVersionKind{Group:
"networking.dubbo.io", Version: "v1", Kind: "VirtualService"}
+ EndpointSlice = config.GroupVersionKind{Group:
"discovery.k8s.io", Version: "v1", Kind: "EndpointSlice"}
+ Endpoints = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "Endpoints"}
)
func ToGVR(g config.GroupVersionKind) (schema.GroupVersionResource, bool) {
@@ -82,6 +84,10 @@ func ToGVR(g config.GroupVersionKind)
(schema.GroupVersionResource, bool) {
return gvr.DestinationRule, true
case VirtualService:
return gvr.VirtualService, true
+ case EndpointSlice:
+ return gvr.EndpointSlice, true
+ case Endpoints:
+ return gvr.Endpoints, true
}
return schema.GroupVersionResource{}, false
}
@@ -121,6 +127,12 @@ func FromGVR(g schema.GroupVersionResource)
(config.GroupVersionKind, bool) {
return VirtualService, true
case gvr.DestinationRule:
return DestinationRule, true
+ case gvr.EndpointSlice:
+ return EndpointSlice, true
+ case gvr.Endpoints:
+ return Endpoints, true
+ case gvr.Service:
+ return Service, true
}
return config.GroupVersionKind{}, false
}
diff --git a/pkg/config/schema/gvr/resources.go
b/pkg/config/schema/gvr/resources.go
index 35f8c75e..86f1d054 100644
--- a/pkg/config/schema/gvr/resources.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -40,6 +40,8 @@ var (
AuthorizationPolicy = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "authorizationpolicies"}
DestinationRule = schema.GroupVersionResource{Group:
"networking.dubbo.io", Version: "v1", Resource: "destinationrules"}
VirtualService = schema.GroupVersionResource{Group:
"networking.dubbo.io", Version: "v1", Resource: "virtualservices"}
+ EndpointSlice = schema.GroupVersionResource{Group:
"discovery.k8s.io", Version: "v1", Resource: "endpointslices"}
+ Endpoints = schema.GroupVersionResource{Group: "",
Version: "v1", Resource: "endpoints"}
)
func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -74,6 +76,10 @@ func IsClusterScoped(g schema.GroupVersionResource) bool {
return true
case ValidatingWebhookConfiguration:
return true
+ case EndpointSlice:
+ return false
+ case Endpoints:
+ return false
}
return false
}
diff --git a/pkg/config/schema/kind/resources.go
b/pkg/config/schema/kind/resources.go
index 80ce5182..de7dab64 100644
--- a/pkg/config/schema/kind/resources.go
+++ b/pkg/config/schema/kind/resources.go
@@ -19,6 +19,7 @@ const (
RequestAuthentication
VirtualService
DestinationRule
+ DNSName
)
func (k Kind) String() string {
@@ -57,6 +58,8 @@ func (k Kind) String() string {
return "VirtualService"
case DestinationRule:
return "DestinationRule"
+ case DNSName:
+ return "DNSName"
default:
return "Unknown"
}
diff --git a/pkg/config/schema/kubeclient/resources.go
b/pkg/config/schema/kubeclient/resources.go
index d06deac2..ff003d02 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -30,6 +30,7 @@ import (
k8sioapiappsv1 "k8s.io/api/apps/v1"
k8sioapicertificatesv1 "k8s.io/api/certificates/v1"
k8sioapicorev1 "k8s.io/api/core/v1"
+ k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
k8sioapipolicyv1 "k8s.io/api/policy/v1"
k8sioapiextensionsapiserverpkgapisapiextensionsv1
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -97,6 +98,10 @@ func gvrToObject(g schema.GroupVersionResource)
runtime.Object {
return &k8sioapicorev1.Namespace{}
case gvr.Secret:
return &k8sioapicorev1.Secret{}
+ case gvr.EndpointSlice:
+ return &k8sioapidiscoveryv1.EndpointSlice{}
+ case gvr.Endpoints:
+ return &k8sioapicorev1.Endpoints{}
case gvr.Service:
return &k8sioapicorev1.Service{}
case gvr.ServiceAccount:
@@ -167,6 +172,20 @@ func getInformerFiltered(c ClientGetter, opts
ktypes.InformerOptions, g schema.G
w = func(options metav1.ListOptions) (watch.Interface, error) {
return
c.Kube().CoreV1().Secrets(opts.Namespace).Watch(context.Background(), options)
}
+ case gvr.EndpointSlice:
+ l = func(options metav1.ListOptions) (runtime.Object, error) {
+ return
c.Kube().DiscoveryV1().EndpointSlices(opts.Namespace).List(context.Background(),
options)
+ }
+ w = func(options metav1.ListOptions) (watch.Interface, error) {
+ return
c.Kube().DiscoveryV1().EndpointSlices(opts.Namespace).Watch(context.Background(),
options)
+ }
+ case gvr.Endpoints:
+ l = func(options metav1.ListOptions) (runtime.Object, error) {
+ return
c.Kube().CoreV1().Endpoints(opts.Namespace).List(context.Background(), options)
+ }
+ w = func(options metav1.ListOptions) (watch.Interface, error) {
+ return
c.Kube().CoreV1().Endpoints(opts.Namespace).Watch(context.Background(), options)
+ }
case gvr.Service:
l = func(options metav1.ListOptions) (runtime.Object, error) {
return
c.Kube().CoreV1().Services(opts.Namespace).List(context.Background(), options)
diff --git a/pkg/config/schema/kubetypes/resources.go
b/pkg/config/schema/kubetypes/resources.go
index fd826128..562e8a54 100644
--- a/pkg/config/schema/kubetypes/resources.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -25,6 +25,7 @@ import (
apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
k8sioapicorev1 "k8s.io/api/core/v1"
+ k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
)
func getGvk(obj any) (config.GroupVersionKind, bool) {
@@ -49,6 +50,12 @@ func getGvk(obj any) (config.GroupVersionKind, bool) {
return gvk.MutatingWebhookConfiguration, true
case *k8sioapiadmissionregistrationv1.ValidatingWebhookConfiguration:
return gvk.ValidatingWebhookConfiguration, true
+ case *k8sioapidiscoveryv1.EndpointSlice:
+ return gvk.EndpointSlice, true
+ case *k8sioapicorev1.Endpoints:
+ return gvk.Endpoints, true
+ case *k8sioapicorev1.Service:
+ return gvk.Service, true
default:
return config.GroupVersionKind{}, false
}
diff --git a/pkg/kube/multicluster/cluster.go b/pkg/kube/multicluster/cluster.go
new file mode 100644
index 00000000..932becd8
--- /dev/null
+++ b/pkg/kube/multicluster/cluster.go
@@ -0,0 +1,23 @@
+package multicluster
+
+import (
+ "crypto/sha256"
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "go.uber.org/atomic"
+)
+
+type Cluster struct {
+ // ID of the cluster.
+ ID cluster.ID
+ // Client for accessing the cluster.
+ Client kube.Client
+
+ kubeConfigSha [sha256.Size]byte
+
+ stop chan struct{}
+ // initialSync is marked when RunAndWait completes
+ initialSync *atomic.Bool
+ // initialSyncTimeout is set when RunAndWait timed out
+ initialSyncTimeout *atomic.Bool
+}
diff --git a/pkg/kube/multicluster/component.go
b/pkg/kube/multicluster/component.go
new file mode 100644
index 00000000..a673ce1a
--- /dev/null
+++ b/pkg/kube/multicluster/component.go
@@ -0,0 +1,65 @@
+package multicluster
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "sync"
+)
+
+type ComponentConstraint interface {
+ Close()
+ HasSynced() bool
+}
+
+type Component[T ComponentConstraint] struct {
+ mu sync.RWMutex
+ constructor func(cluster *Cluster) T
+ clusters map[cluster.ID]T
+}
+
+func (m *Component[T]) clusterAdded(cluster *Cluster) ComponentConstraint {
+ comp := m.constructor(cluster)
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.clusters[cluster.ID] = comp
+ return comp
+}
+
+func (m *Component[T]) clusterUpdated(cluster *Cluster) ComponentConstraint {
+ // Build outside of the lock, in case its slow
+ comp := m.constructor(cluster)
+ old, f := m.clusters[cluster.ID]
+ m.mu.Lock()
+ m.clusters[cluster.ID] = comp
+ m.mu.Unlock()
+ // Close outside of the lock, in case its slow
+ if f {
+ old.Close()
+ }
+ return comp
+}
+
+func (m *Component[T]) clusterDeleted(cluster cluster.ID) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ // If there is an old one, close it
+ if old, f := m.clusters[cluster]; f {
+ old.Close()
+ }
+ delete(m.clusters, cluster)
+}
+
+func (m *Component[T]) All() []T {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return maps.Values(m.clusters)
+}
+
+func (m *Component[T]) HasSynced() bool {
+ for _, c := range m.All() {
+ if !c.HasSynced() {
+ return false
+ }
+ }
+ return true
+}
diff --git a/pkg/kube/multicluster/secretcontroller.go
b/pkg/kube/multicluster/secretcontroller.go
new file mode 100644
index 00000000..28d82b85
--- /dev/null
+++ b/pkg/kube/multicluster/secretcontroller.go
@@ -0,0 +1,95 @@
+package multicluster
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/rest"
+ "sync"
+)
+
+type ClusterStore struct {
+ sync.RWMutex
+ clusters sets.String
+}
+
+type Controller struct {
+ namespace string
+ configClusterID cluster.ID
+ configCluster *Cluster
+ configClusterSyncers []ComponentConstraint
+
+ queue controllers.Queue
+ secrets kclient.Client[*corev1.Secret]
+ configOverrides []func(*rest.Config)
+
+ cs *ClusterStore
+
+ meshWatcher mesh.Watcher
+ handlers []handler
+}
+
+func NewController(kubeclientset kube.Client, namespace string, clusterID
cluster.ID,
+ meshWatcher mesh.Watcher, configOverrides ...func(*rest.Config),
+) *Controller {
+ controller := &Controller{
+ namespace: namespace,
+ configClusterID: clusterID,
+ configCluster: &Cluster{Client: kubeclientset, ID: clusterID},
+
+ configOverrides: configOverrides,
+ meshWatcher: meshWatcher,
+ }
+ return controller
+}
+
+func (c *Controller) Run(stopCh <-chan struct{}) error {
+ // run handlers for the config cluster; do not store this *Cluster in
the ClusterStore or give it a SyncTimeout
+ // this is done outside the goroutine, we should block other
Run/startFuncs until this is registered
+ c.configClusterSyncers = c.handleAdd(c.configCluster)
+
+ return nil
+}
+
+func (c *Controller) handleAdd(cluster *Cluster) []ComponentConstraint {
+ syncers := make([]ComponentConstraint, 0, len(c.handlers))
+ for _, handler := range c.handlers {
+ syncers = append(syncers, handler.clusterAdded(cluster))
+ }
+ return syncers
+}
+
+func (c *Controller) handleDelete(key cluster.ID) {
+ for _, handler := range c.handlers {
+ handler.clusterDeleted(key)
+ }
+}
+
+type handler interface {
+ clusterAdded(cluster *Cluster) ComponentConstraint
+ clusterUpdated(cluster *Cluster) ComponentConstraint
+ clusterDeleted(clusterID cluster.ID)
+ HasSynced() bool
+}
+
+type ComponentBuilder interface {
+ registerHandler(h handler)
+}
+
+func BuildMultiClusterComponent[T ComponentConstraint](c ComponentBuilder,
constructor func(cluster *Cluster) T) *Component[T] {
+ comp := &Component[T]{
+ constructor: constructor,
+ clusters: make(map[cluster.ID]T),
+ }
+ c.registerHandler(comp)
+ return comp
+}
+
+func (c *Controller) registerHandler(h handler) {
+ // Intentionally no lock. The controller today requires that handlers
are registered before execution and not in parallel.
+ c.handlers = append(c.handlers, h)
+}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index d601fcd0..6207b934 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -34,6 +34,7 @@ import (
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/inject"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
"github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/network"
@@ -46,6 +47,7 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/server"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
@@ -94,8 +96,10 @@ type Server struct {
httpMux *http.ServeMux
httpsMux *http.ServeMux // webhooks
- ConfigStores []model.ConfigStoreController
- configController model.ConfigStoreController
+ ConfigStores []model.ConfigStoreController
+ configController model.ConfigStoreController
+ multiclusterController *multicluster.Controller
+ serviceEntryController *serviceentry.Controller
fileWatcher filewatcher.FileWatcher
internalStop chan struct{}
@@ -385,6 +389,19 @@ func (s *Server) startCA(caOpts *caOptions) {
})
}
+func (s *Server) initMulticluster(args *SailArgs) {
+ if s.kubeClient == nil {
+ return
+ }
+ s.multiclusterController = multicluster.NewController(s.kubeClient,
args.Namespace, s.clusterID, s.environment.Watcher, func(r *rest.Config) {
+ r.QPS = args.RegistryOptions.KubeOptions.KubernetesAPIQPS
+ r.Burst = args.RegistryOptions.KubeOptions.KubernetesAPIBurst
+ })
+ s.addStartFunc("multicluster controller", func(stop <-chan struct{})
error {
+ return s.multiclusterController.Run(stop)
+ })
+}
+
func (s *Server) initKubeClient(args *SailArgs) error {
if s.kubeClient != nil {
// Already initialized by startup arguments
@@ -479,7 +496,8 @@ func (s *Server) initGrpcServer(options
*dubbokeepalive.Options) {
func (s *Server) initControllers(args *SailArgs) error {
klog.Info("initializing controllers")
- // TODO initMulticluster
+
+ s.initMulticluster(args)
s.initSDSServer()
@@ -673,12 +691,6 @@ func (s *Server) initSDSServer() {
klog.Warningf("skipping Kubernetes credential reader;
SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
} else {
// TODO ConfigUpdated Multicluster get secret and configmap
- s.XDSServer.ConfigUpdate(&model.PushRequest{
- Full: false,
- ConfigsUpdated: nil,
- Reason:
model.NewReasonStats(model.SecretTrigger),
- })
-
}
}
@@ -827,6 +839,23 @@ func (s *Server) dubbodReadyHandler(w http.ResponseWriter,
_ *http.Request) {
w.WriteHeader(http.StatusOK)
}
+func (s *Server) shouldStartNsController() bool {
+ if s.isK8SSigning() {
+ // Need to distribute the roots from MeshConfig
+ return true
+ }
+ if s.CA == nil {
+ return false
+ }
+
+ // For no CA we don't distribute it either, as there is no cert
+ if features.SailCertProvider == constants.CertProviderNone {
+ return false
+ }
+
+ return true
+}
+
func getDNSNames(args *SailArgs, host string) []string {
// Append custom hostname if there is any
customHost := features.DubbodServiceCustomHost
diff --git a/sail/pkg/bootstrap/servicecontroller.go
b/sail/pkg/bootstrap/servicecontroller.go
index 70cac1bd..cf59a83d 100644
--- a/sail/pkg/bootstrap/servicecontroller.go
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+ kubecontroller
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube/controller"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"k8s.io/klog/v2"
)
@@ -55,5 +56,13 @@ func (s *Server) initKubeRegistry(args *SailArgs) (err
error) {
args.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace
args.RegistryOptions.KubeOptions.MeshServiceController =
s.ServiceController()
- return
+ kubecontroller.NewMulticluster(args.PodName,
+ args.RegistryOptions.KubeOptions,
+ s.dubbodCertBundleWatcher,
+ args.Revision,
+ s.shouldStartNsController(),
+ s.environment.ClusterLocal(),
+ s.server,
+ s.multiclusterController)
+ return err
}
diff --git a/sail/pkg/config/kube/crdclient/types.go
b/sail/pkg/config/kube/crdclient/types.go
index 778b833d..10f2fcbf 100644
--- a/sail/pkg/config/kube/crdclient/types.go
+++ b/sail/pkg/config/kube/crdclient/types.go
@@ -14,6 +14,7 @@ import (
k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
k8sioapiappsv1 "k8s.io/api/apps/v1"
k8sioapicorev1 "k8s.io/api/core/v1"
+ k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
k8sioapiextensionsapiserverpkgapisapiextensionsv1
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -466,4 +467,22 @@ var translationMap = map[config.GroupVersionKind]func(r
runtime.Object) config.C
Status: &obj.Status,
}
},
+ gvk.EndpointSlice: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapidiscoveryv1.EndpointSlice)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.EndpointSlice,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: obj,
+ }
+ },
}
diff --git a/sail/pkg/model/addressmap.go b/sail/pkg/model/addressmap.go
index c8b166f3..01623908 100644
--- a/sail/pkg/model/addressmap.go
+++ b/sail/pkg/model/addressmap.go
@@ -73,3 +73,20 @@ func (m *AddressMap) Len() int {
return len(m.Addresses)
}
+
+func (m *AddressMap) AddAddressesFor(c cluster.ID, addresses []string)
*AddressMap {
+ if len(addresses) == 0 {
+ return m
+ }
+
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ // Create the map if nil.
+ if m.Addresses == nil {
+ m.Addresses = make(map[cluster.ID][]string)
+ }
+
+ m.Addresses[c] = append(m.Addresses[c], addresses...)
+ return m
+}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
index 72e2dd10..775ac50d 100644
--- a/sail/pkg/model/controller.go
+++ b/sail/pkg/model/controller.go
@@ -1,10 +1,19 @@
package model
+import "sync"
+
type Controller interface {
Run(stop <-chan struct{})
HasSynced() bool
}
+type ServiceHandler func(*Service, *Service, Event)
+
+type ControllerHandlers struct {
+ mutex sync.RWMutex
+ serviceHandlers []ServiceHandler
+}
+
type AggregateController interface {
Controller
}
@@ -29,3 +38,16 @@ func (event Event) String() string {
}
return out
}
+
+func (c *ControllerHandlers) NotifyServiceHandlers(prev, curr *Service, event
Event) {
+ for _, f := range c.GetServiceHandlers() {
+ f(prev, curr, event)
+ }
+}
+
+func (c *ControllerHandlers) GetServiceHandlers() []ServiceHandler {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ // Return a shallow copy of the array
+ return c.serviceHandlers
+}
diff --git a/sail/pkg/model/endpointshards.go b/sail/pkg/model/endpointshards.go
index 43e4cbfe..dd87f6fd 100644
--- a/sail/pkg/model/endpointshards.go
+++ b/sail/pkg/model/endpointshards.go
@@ -88,6 +88,44 @@ func endpointUpdateRequiresPush(oldDubboEndpoints
[]*DubboEndpoint, incomingEndp
return newDubboEndpoints, needPush
}
+func (e *EndpointIndex) ShardsForService(serviceName, namespace string)
(*EndpointShards, bool) {
+ e.mu.RLock()
+ defer e.mu.RUnlock()
+ byNs, ok := e.shardsBySvc[serviceName]
+ if !ok {
+ return nil, false
+ }
+ shards, ok := byNs[namespace]
+ return shards, ok
+}
+
+func (es *EndpointShards) CopyEndpoints(portMap map[string]int, ports
sets.Set[int]) map[int][]*DubboEndpoint {
+ es.RLock()
+ defer es.RUnlock()
+ res := map[int][]*DubboEndpoint{}
+ for _, v := range es.Shards {
+ for _, ep := range v {
+ // use the port name as the key, unless
LegacyClusterPortKey is set and takes precedence
+ // In EDS we match on port *name*. But for historical
reasons, we match on port number for CDS.
+ var portNum int
+ if ep.LegacyClusterPortKey != 0 {
+ if !ports.Contains(ep.LegacyClusterPortKey) {
+ continue
+ }
+ portNum = ep.LegacyClusterPortKey
+ } else {
+ pn, f := portMap[ep.ServicePortName]
+ if !f {
+ continue
+ }
+ portNum = pn
+ }
+ res[portNum] = append(res[portNum], ep)
+ }
+ }
+ return res
+}
+
func (e *EndpointIndex) UpdateServiceEndpoints(
shard ShardKey,
hostname string,
@@ -216,3 +254,12 @@ func (e *EndpointIndex) deleteServiceInner(shard ShardKey,
serviceName, namespac
}
epShards.Unlock()
}
+
+type shardRegistry interface {
+ Cluster() cluster.ID
+ Provider() provider.ID
+}
+
+func ShardKeyFromRegistry(instance shardRegistry) ShardKey {
+ return ShardKey{Cluster: instance.Cluster(), Provider:
instance.Provider()}
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index a6fde7c4..5d0cd60f 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -18,15 +18,20 @@
package model
import (
+ "cmp"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/spiffe"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/pkg/xds"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"go.uber.org/atomic"
meshconfig "istio.io/api/mesh/v1alpha1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/klog/v2"
"sync"
"time"
)
@@ -40,11 +45,10 @@ var (
type TriggerReason string
const (
- EndpointUpdate TriggerReason = "endpoint"
- UnknownTrigger TriggerReason = "unknown"
- ProxyRequest TriggerReason = "proxyrequest"
- GlobalUpdate TriggerReason = "global"
- SecretTrigger TriggerReason = "secret"
+ UnknownTrigger TriggerReason = "unknown"
+ ProxyRequest TriggerReason = "proxyrequest"
+ GlobalUpdate TriggerReason = "global"
+ HeadlessEndpointUpdate TriggerReason = "headlessendpoint"
)
type ProxyPushStatus struct {
@@ -102,7 +106,20 @@ type PushRequest struct {
type ResourceDelta = xds.ResourceDelta
func NewPushContext() *PushContext {
- return &PushContext{}
+ return &PushContext{
+ ServiceIndex: newServiceIndex(),
+ serviceAccounts: map[serviceAccountKey][]string{},
+ }
+}
+
+func newServiceIndex() serviceIndex {
+ return serviceIndex{
+ public: []*Service{},
+ privateByNamespace: map[string][]*Service{},
+ exportedToNamespace: map[string][]*Service{},
+ HostnameAndNamespace: map[host.Name]map[string]*Service{},
+ instancesByPort: map[string]map[int][]*DubboEndpoint{},
+ }
}
type ConfigKey struct {
@@ -130,8 +147,8 @@ func (pr *PushRequest) CopyMerge(other *PushRequest)
*PushRequest {
}
type XDSUpdater interface {
- EDSUpdate(shard ShardKey, hostname string, namespace string, entry
[]*DubboEndpoint)
ConfigUpdate(req *PushRequest)
+ SvcUpdate(shard ShardKey, hostname string, namespace string, event
Event)
}
func (ps *PushContext) InitContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
@@ -189,9 +206,20 @@ func (ps *PushContext) initDefaultExportMaps() {
}
}
-func (ps *PushContext) createNewContext(env *Environment) {}
+func (ps *PushContext) createNewContext(env *Environment) {
+ ps.initServiceRegistry(env, nil)
+}
func (ps *PushContext) updateContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
+ var servicesChanged bool
+ if servicesChanged {
+ // Services have changed. initialize service registry
+ ps.initServiceRegistry(env, pushReq.ConfigsUpdated)
+ } else {
+ // make sure we copy over things that would be generated in
initServiceRegistry
+ ps.ServiceIndex = oldPushContext.ServiceIndex
+ ps.serviceAccounts = oldPushContext.serviceAccounts
+ }
}
func (pr *PushRequest) Merge(other *PushRequest) *PushRequest {
@@ -287,6 +315,223 @@ func (ps *PushContext) servicesExportedToNamespace(ns
string) []*Service {
return out
}
+func (ps *PushContext) initServiceRegistry(env *Environment, configsUpdate
sets.Set[ConfigKey]) {
+ // Sort the services in order of creation.
+ allServices := SortServicesByCreationTime(env.Services())
+ resolveServiceAliases(allServices, configsUpdate)
+
+ for _, s := range allServices {
+ portMap := map[string]int{}
+ ports := sets.New[int]()
+ for _, port := range s.Ports {
+ portMap[port.Name] = port.Port
+ ports.Insert(port.Port)
+ }
+
+ svcKey := s.Key()
+ if _, ok := ps.ServiceIndex.instancesByPort[svcKey]; !ok {
+ ps.ServiceIndex.instancesByPort[svcKey] =
make(map[int][]*DubboEndpoint)
+ }
+ shards, ok :=
env.EndpointIndex.ShardsForService(string(s.Hostname), s.Attributes.Namespace)
+ if ok {
+ instancesByPort := shards.CopyEndpoints(portMap, ports)
+ // Iterate over the instances and add them to the
service index to avoid overriding the existing port instances.
+ for port, instances := range instancesByPort {
+ ps.ServiceIndex.instancesByPort[svcKey][port] =
instances
+ }
+ }
+ if _, f := ps.ServiceIndex.HostnameAndNamespace[s.Hostname]; !f
{
+ ps.ServiceIndex.HostnameAndNamespace[s.Hostname] =
map[string]*Service{}
+ }
+ // In some scenarios, there may be multiple Services defined
for the same hostname due to ServiceEntry allowing
+ // arbitrary hostnames. In these cases, we want to pick the
first Service, which is the oldest. This ensures
+ // newly created Services cannot take ownership unexpectedly.
+ // However, the Service is from Kubernetes it should take
precedence over ones not. This prevents someone from
+ // "domain squatting" on the hostname before a Kubernetes
Service is created.
+ if existing :=
ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace];
existing != nil &&
+ !(existing.Attributes.ServiceRegistry !=
provider.Kubernetes && s.Attributes.ServiceRegistry == provider.Kubernetes) {
+ klog.V(2).Infof("Service %s/%s from registry %s ignored
by %s/%s/%s", s.Attributes.Namespace, s.Hostname, s.Attributes.ServiceRegistry,
+ existing.Attributes.ServiceRegistry,
existing.Attributes.Namespace, existing.Hostname)
+ } else {
+
ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace] = s
+ }
+
+ ns := s.Attributes.Namespace
+ if s.Attributes.ExportTo.IsEmpty() {
+ if
ps.exportToDefaults.service.Contains(visibility.Private) {
+ ps.ServiceIndex.privateByNamespace[ns] =
append(ps.ServiceIndex.privateByNamespace[ns], s)
+ } else if
ps.exportToDefaults.service.Contains(visibility.Public) {
+ ps.ServiceIndex.public =
append(ps.ServiceIndex.public, s)
+ }
+ } else {
+ // if service has exportTo *, make it public and ignore
all other exportTos.
+ // if service does not have exportTo *, but has
exportTo ~ - i.e. not visible to anyone, ignore all exportTos.
+ // if service has exportTo ., replace with current
namespace.
+ if s.Attributes.ExportTo.Contains(visibility.Public) {
+ ps.ServiceIndex.public =
append(ps.ServiceIndex.public, s)
+ continue
+ } else if
s.Attributes.ExportTo.Contains(visibility.None) {
+ continue
+ }
+ // . or other namespaces
+ for exportTo := range s.Attributes.ExportTo {
+ if exportTo == visibility.Private ||
string(exportTo) == ns {
+ // exportTo with same namespace is
effectively private
+ ps.ServiceIndex.privateByNamespace[ns]
= append(ps.ServiceIndex.privateByNamespace[ns], s)
+ } else {
+ // exportTo is a specific target
namespace
+
ps.ServiceIndex.exportedToNamespace[string(exportTo)] =
append(ps.ServiceIndex.exportedToNamespace[string(exportTo)], s)
+ }
+ }
+ }
+ }
+
+ ps.initServiceAccounts(env, allServices)
+}
+
+func (ps *PushContext) initServiceAccounts(env *Environment, services
[]*Service) {
+ for _, svc := range services {
+ var accounts sets.String
+ // First get endpoint level service accounts
+ shard, f :=
env.EndpointIndex.ShardsForService(string(svc.Hostname),
svc.Attributes.Namespace)
+ if f {
+ shard.RLock()
+ // copy here to reduce the lock time
+ // endpoints could update frequently, so the longer it
locks, the more likely it will block other threads.
+ accounts = shard.ServiceAccounts.Copy()
+ shard.RUnlock()
+ }
+ if len(svc.ServiceAccounts) > 0 {
+ if accounts == nil {
+ accounts = sets.New(svc.ServiceAccounts...)
+ } else {
+ accounts =
accounts.InsertAll(svc.ServiceAccounts...)
+ }
+ }
+ sa := sets.SortedList(spiffe.ExpandWithTrustDomains(accounts,
ps.Mesh.TrustDomainAliases))
+ key := serviceAccountKey{
+ hostname: svc.Hostname,
+ namespace: svc.Attributes.Namespace,
+ }
+ ps.serviceAccounts[key] = sa
+ }
+}
+
+func SortServicesByCreationTime(services []*Service) []*Service {
+ slices.SortStableFunc(services, func(i, j *Service) int {
+ if r := i.CreationTime.Compare(j.CreationTime); r != 0 {
+ return r
+ }
+ // If creation time is the same, then behavior is
nondeterministic. In this case, we can
+ // pick an arbitrary but consistent ordering based on name and
namespace, which is unique.
+ // CreationTimestamp is stored in seconds, so this is not
uncommon.
+ if r := cmp.Compare(i.Attributes.Name, j.Attributes.Name); r !=
0 {
+ return r
+ }
+ return cmp.Compare(i.Attributes.Namespace,
j.Attributes.Namespace)
+ })
+ return services
+}
+
+func resolveServiceAliases(allServices []*Service, configsUpdated
sets.Set[ConfigKey]) {
+ // rawAlias builds a map of Service -> AliasFor. So this will be
ExternalName -> Service.
+ // In an edge case, we can have ExternalName -> ExternalName; we
resolve that below.
+ rawAlias := map[NamespacedHostname]host.Name{}
+ for _, s := range allServices {
+ if s.Resolution != Alias {
+ continue
+ }
+ nh := NamespacedHostname{
+ Hostname: s.Hostname,
+ Namespace: s.Attributes.Namespace,
+ }
+ rawAlias[nh] =
host.Name(s.Attributes.K8sAttributes.ExternalName)
+ }
+
+ // unnamespacedRawAlias is like rawAlias but without namespaces.
+ // This is because an `ExternalName` isn't namespaced. If there is a
conflict, the behavior is undefined.
+ // This is split from above as a minor optimization to right-size the
map
+ unnamespacedRawAlias := make(map[host.Name]host.Name, len(rawAlias))
+ for k, v := range rawAlias {
+ unnamespacedRawAlias[k.Hostname] = v
+ }
+
+ // resolvedAliases builds a map of Alias -> Concrete, fully resolving
through multiple hops.
+ // Ex: Alias1 -> Alias2 -> Concrete will flatten to Alias1 -> Concrete.
+ resolvedAliases := make(map[NamespacedHostname]host.Name, len(rawAlias))
+ for alias, referencedService := range rawAlias {
+ // referencedService may be another alias or a concrete service.
+ if _, f := unnamespacedRawAlias[referencedService]; !f {
+ // Common case: alias pointing to a concrete service
+ resolvedAliases[alias] = referencedService
+ continue
+ }
+ // Otherwise, we need to traverse the alias "graph".
+ // In an obscure edge case, a user could make a loop, so we
will need to handle that.
+ seen := sets.New(alias.Hostname, referencedService)
+ for {
+ n, f := unnamespacedRawAlias[referencedService]
+ if !f {
+ // The destination we are pointing to is not an
alias, so this is the terminal step
+ resolvedAliases[alias] = referencedService
+ break
+ }
+ if seen.InsertContains(n) {
+ // We did a loop!
+ // Kubernetes will make these NXDomain, so we
can just treat it like it doesn't exist at all
+ break
+ }
+ referencedService = n
+ }
+ }
+
+ // aliasesForService builds a map of Concrete -> []Aliases
+ // This basically reverses our resolvedAliased map, which is Alias ->
Concrete,
+ aliasesForService := map[host.Name][]NamespacedHostname{}
+ for alias, concrete := range resolvedAliases {
+ aliasesForService[concrete] =
append(aliasesForService[concrete], alias)
+
+ // We also need to update configsUpdated, such that any "alias"
updated also marks the concrete service as updated.
+ aliasKey := ConfigKey{
+ Kind: kind.ServiceEntry,
+ Name: alias.Hostname.String(),
+ Namespace: alias.Namespace,
+ }
+ // Alias. We should mark all the concrete services as updated
as well.
+ if configsUpdated.Contains(aliasKey) {
+ // We only have the hostname, but we need the
namespace...
+ for _, svc := range allServices {
+ if svc.Hostname == concrete {
+ configsUpdated.Insert(ConfigKey{
+ Kind: kind.ServiceEntry,
+ Name: concrete.String(),
+ Namespace:
svc.Attributes.Namespace,
+ })
+ }
+ }
+ }
+ }
+ // Sort aliases so order is deterministic.
+ for _, v := range aliasesForService {
+ slices.SortFunc(v, func(a, b NamespacedHostname) int {
+ if r := cmp.Compare(a.Namespace, b.Namespace); r != 0 {
+ return r
+ }
+ return cmp.Compare(a.Hostname, b.Hostname)
+ })
+ }
+
+ // Finally, we can traverse all services and update the ones that have
aliases
+ for i, s := range allServices {
+ if aliases, f := aliasesForService[s.Hostname]; f {
+ // This service has an alias; set it. We need to make a
copy since the underlying Service is shared
+ s = s.DeepCopy()
+ s.Attributes.Aliases = aliases
+ allServices[i] = s
+ }
+ }
+}
+
func NewReasonStats(reasons ...TriggerReason) ReasonStats {
ret := make(ReasonStats)
for _, reason := range reasons {
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 004793cd..0c69d941 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -11,10 +11,29 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"github.com/google/go-cmp/cmp"
+ "istio.io/api/annotation"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/klog/v2"
+ "strings"
"sync"
"time"
)
+type Resolution int
+
+const (
+ // ClientSideLB implies that the proxy will decide the endpoint from
its local lb pool
+ ClientSideLB Resolution = iota
+ // DNSLB implies that the proxy will resolve a DNS address and forward
to the resolved address
+ DNSLB
+ // Passthrough implies that the proxy should forward traffic to the
destination IP requested by the caller
+ Passthrough
+ // DNSRoundRobinLB implies that the proxy will resolve a DNS address
and forward to the resolved address
+ DNSRoundRobinLB
+ // Alias defines a Service that is an alias for another.
+ Alias
+)
+
type EndpointDiscoverabilityPolicy interface {
String() string
}
@@ -61,6 +80,7 @@ type DubboEndpoint struct {
HealthStatus HealthStatus
SendUnhealthyEndpoints bool
DiscoverabilityPolicy EndpointDiscoverabilityPolicy `json:"-"`
+ LegacyClusterPortKey int
}
func (ep *DubboEndpoint) FirstAddressOrNil() string {
@@ -132,6 +152,66 @@ type ServiceAttributes struct {
// Namespace is "destination.service.namespace" attribute
Namespace string
ServiceRegistry provider.ID
+ K8sAttributes
+}
+
+type K8sAttributes struct {
+ // Type holds the value of the corev1.Type of the Kubernetes service
+ // spec.Type
+ Type string
+
+ // spec.ExternalName
+ ExternalName string
+
+ // NodeLocal means the proxy will only forward traffic to node local
endpoints
+ // spec.InternalTrafficPolicy == Local
+ NodeLocal bool
+
+ // TrafficDistribution determines the service-level traffic
distribution.
+ // This may be overridden by locality load balancing settings.
+ TrafficDistribution TrafficDistribution
+
+ // ObjectName is the object name of the underlying object. This may
differ from the Service.Attributes.Name for legacy semantics.
+ ObjectName string
+
+ // spec.PublishNotReadyAddresses
+ PublishNotReadyAddresses bool
+}
+
+type TrafficDistribution int
+
+const (
+ // TrafficDistributionAny allows any destination
+ TrafficDistributionAny TrafficDistribution = iota
+ // TrafficDistributionPreferPreferSameZone prefers traffic in same
zone, failing over to same region and then network.
+ TrafficDistributionPreferSameZone
+ // TrafficDistributionPreferNode prefers traffic in same node, failing
over to same subzone, then zone, region, and network.
+ TrafficDistributionPreferSameNode
+)
+
+func GetTrafficDistribution(specValue *string, annotations map[string]string)
TrafficDistribution {
+ if specValue != nil {
+ switch *specValue {
+ case corev1.ServiceTrafficDistributionPreferSameZone,
corev1.ServiceTrafficDistributionPreferClose:
+ return TrafficDistributionPreferSameZone
+ case corev1.ServiceTrafficDistributionPreferSameNode:
+ return TrafficDistributionPreferSameNode
+ }
+ }
+ // The TrafficDistribution field is quite new, so we allow a legacy
annotation option as well
+ // This also has some custom types
+ trafficDistributionAnnotationValue :=
strings.ToLower(annotations[annotation.NetworkingTrafficDistribution.Name])
+ switch trafficDistributionAnnotationValue {
+ case strings.ToLower(corev1.ServiceTrafficDistributionPreferClose),
strings.ToLower(corev1.ServiceTrafficDistributionPreferSameZone):
+ return TrafficDistributionPreferSameZone
+ case strings.ToLower(corev1.ServiceTrafficDistributionPreferSameNode):
+ return TrafficDistributionPreferSameNode
+ default:
+ if trafficDistributionAnnotationValue != "" {
+ klog.Warningf("Unknown traffic distribution annotation,
defaulting to any")
+ }
+ return TrafficDistributionAny
+ }
}
type AddressMap struct {
@@ -151,12 +231,18 @@ func (m *AddressMap) DeepCopy() *AddressMap {
}
type Service struct {
- Attributes ServiceAttributes
- Hostname host.Name `json:"hostname"`
- Ports PortList `json:"ports,omitempty"`
- ServiceAccounts []string `json:"serviceAccounts,omitempty"`
- ClusterVIPs AddressMap `json:"clusterVIPs,omitempty"`
- CreationTime time.Time `json:"creationTime,omitempty"`
+ Attributes ServiceAttributes
+ Hostname host.Name `json:"hostname"`
+ Ports PortList `json:"ports,omitempty"`
+ ServiceAccounts []string `json:"serviceAccounts,omitempty"`
+ ClusterVIPs AddressMap `json:"clusterVIPs,omitempty"`
+ CreationTime time.Time `json:"creationTime,omitempty"`
+ DefaultAddress string `json:"defaultAddress,omitempty"`
+ ResourceVersion string
+ Resolution Resolution
+ AutoAllocatedIPv4Address string
`json:"autoAllocatedIPv4Address,omitempty"`
+ AutoAllocatedIPv6Address string
`json:"autoAllocatedIPv6Address,omitempty"`
+ MeshExternal bool
}
func (s *Service) DeepCopy() *Service {
@@ -247,3 +333,83 @@ func (s *ServiceAttributes) DeepCopy() ServiceAttributes {
// nolint: govet
return out
}
+
+func (s *Service) Equals(other *Service) bool {
+ if s == nil {
+ return other == nil
+ }
+ if other == nil {
+ return s == nil
+ }
+
+ if !s.Attributes.Equals(&other.Attributes) {
+ return false
+ }
+
+ if !s.Ports.Equals(other.Ports) {
+ return false
+ }
+ if !slices.Equal(s.ServiceAccounts, other.ServiceAccounts) {
+ return false
+ }
+
+ if len(s.ClusterVIPs.Addresses) != len(other.ClusterVIPs.Addresses) {
+ return false
+ }
+ for k, v1 := range s.ClusterVIPs.Addresses {
+ if v2, ok := other.ClusterVIPs.Addresses[k]; !ok ||
!slices.Equal(v1, v2) {
+ return false
+ }
+ }
+
+ return s.DefaultAddress == other.DefaultAddress &&
s.AutoAllocatedIPv4Address == other.AutoAllocatedIPv4Address &&
+ s.AutoAllocatedIPv6Address == other.AutoAllocatedIPv6Address &&
s.Hostname == other.Hostname &&
+ s.Resolution == other.Resolution && s.MeshExternal ==
other.MeshExternal
+}
+
+func (s *ServiceAttributes) Equals(other *ServiceAttributes) bool {
+ if s == nil {
+ return other == nil
+ }
+ if other == nil {
+ return s == nil
+ }
+
+ if !maps.Equal(s.Labels, other.Labels) {
+ return false
+ }
+
+ if !maps.Equal(s.LabelSelectors, other.LabelSelectors) {
+ return false
+ }
+
+ if !maps.Equal(s.ExportTo, other.ExportTo) {
+ return false
+ }
+
+ if !slices.Equal(s.Aliases, other.Aliases) {
+ return false
+ }
+
+ if s.ClusterExternalAddresses.Len() !=
other.ClusterExternalAddresses.Len() {
+ return false
+ }
+
+ for k, v1 := range s.ClusterExternalAddresses.GetAddresses() {
+ if v2, ok := other.ClusterExternalAddresses.Addresses[k]; !ok
|| !slices.Equal(v1, v2) {
+ return false
+ }
+ }
+
+ if len(s.ClusterExternalPorts) != len(other.ClusterExternalPorts) {
+ return false
+ }
+
+ for k, v1 := range s.ClusterExternalPorts {
+ if v2, ok := s.ClusterExternalPorts[k]; !ok || !maps.Equal(v1,
v2) {
+ return false
+ }
+ }
+ return s.Name == other.Name && s.Namespace == other.Namespace &&
+ s.ServiceRegistry == other.ServiceRegistry && s.K8sAttributes
== other.K8sAttributes
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go
b/sail/pkg/serviceregistry/kube/controller/controller.go
index 3dd79905..839ad194 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -22,14 +22,22 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/queue"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"go.uber.org/atomic"
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sort"
"sync"
@@ -47,6 +55,28 @@ type Controller struct {
servicesMap map[host.Name]*model.Service
queue queue.Instance
initialSyncTimedout *atomic.Bool
+ configCluster bool
+ services kclient.Client[*v1.Service]
+ endpoints *endpointSliceController
+ meshWatcher mesh.Watcher
+ handlers model.ControllerHandlers
+}
+
+func NewController(kubeClient kubelib.Client, options Options) *Controller {
+ c := &Controller{
+ opts: options,
+ client: kubeClient,
+ queue: queue.NewQueueWithID(1*time.Second,
string(options.ClusterID)),
+ servicesMap: make(map[host.Name]*model.Service),
+ initialSyncTimedout: atomic.NewBool(false),
+
+ configCluster: options.ConfigCluster,
+ }
+ c.services = kclient.NewFiltered[*v1.Service](kubeClient,
kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
+ registerHandlers(c, c.services, "Services", c.onServiceEvent, nil)
+ c.endpoints = newEndpointSliceController(c)
+ c.meshWatcher = options.MeshWatcher
+ return c
}
type Options struct {
@@ -63,6 +93,7 @@ type Options struct {
KrtDebugger *krt.DebugHandler
SyncTimeout time.Duration
Revision string
+ ConfigCluster bool
}
func (c *Controller) Services() []*model.Service {
@@ -84,6 +115,79 @@ func (c *Controller) GetService(hostname host.Name)
*model.Service {
return svc
}
+func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event)
error {
+ klog.V(2).Infof("Handle event %s for service %s in namespace %s",
event, curr.Name, curr.Namespace)
+
+ // Create the standard (cluster.local) service.
+ svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster(),
c.meshWatcher.Mesh())
+
+ switch event {
+ case model.EventDelete:
+ c.deleteService(svcConv)
+ default:
+ c.addOrUpdateService(pre, curr, svcConv, event, false)
+ }
+
+ return nil
+}
+
+func (c *Controller) deleteService(svc *model.Service) {
+ c.Lock()
+ delete(c.servicesMap, svc.Hostname)
+ c.Unlock()
+
+ shard := model.ShardKeyFromRegistry(c)
+ event := model.EventDelete
+ c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname),
svc.Attributes.Namespace, event)
+ if !svc.Attributes.ExportTo.Contains(visibility.None) {
+ c.handlers.NotifyServiceHandlers(nil, svc, event)
+ }
+}
+
+func (c *Controller) addOrUpdateService(pre, curr *v1.Service, currConv
*model.Service, event model.Event, updateEDSCache bool) {
+ c.Lock()
+ prevConv := c.servicesMap[currConv.Hostname]
+ c.servicesMap[currConv.Hostname] = currConv
+ c.Unlock()
+ // This full push needed to update all endpoints, even though we do a
full push on service add/update
+ // as that full push is only triggered for the specific service.
+
+ shard := model.ShardKeyFromRegistry(c)
+ ns := currConv.Attributes.Namespace
+
+ c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
+ if serviceUpdateNeedsPush(pre, curr, prevConv, currConv) {
+ klog.V(2).Infof("Service %s in namespace %s updated and needs
push", currConv.Hostname, ns)
+ c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
+ }
+}
+
+func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv
*model.Service) bool {
+ // New Service - If it is not exported, no need to push.
+ if preConv == nil {
+ return !currConv.Attributes.ExportTo.Contains(visibility.None)
+ }
+ // if service Visibility is None and has not changed in the
update/delete, no need to push.
+ if preConv.Attributes.ExportTo.Contains(visibility.None) &&
+ currConv.Attributes.ExportTo.Contains(visibility.None) {
+ return false
+ }
+ // Check if there are any changes we care about by comparing
`model.Service`s
+ if !preConv.Equals(currConv) {
+ return true
+ }
+ // Also check if target ports are changed since they are not included
in `model.Service`
+ // `preConv.Equals(currConv)` already makes sure the length of ports is
not changed
+ if prev != nil && curr != nil {
+ if !slices.EqualFunc(prev.Spec.Ports, curr.Spec.Ports, func(a,
b v1.ServicePort) bool {
+ return a.TargetPort == b.TargetPort
+ }) {
+ return true
+ }
+ }
+ return false
+}
+
func (c *Controller) Provider() provider.ID {
return provider.Kubernetes
}
@@ -121,3 +225,51 @@ func (c *Controller) HasSynced() bool {
func (c *Controller) informersSynced() bool {
return true
}
+
+type FilterOutFunc[T controllers.Object] func(old, cur T) bool
+
+func registerHandlers[T controllers.ComparableObject](c *Controller,
+ informer kclient.Informer[T], otype string,
+ handler func(T, T, model.Event) error, filter FilterOutFunc[T],
+) {
+ wrappedHandler := func(prev, curr T, event model.Event) error {
+ curr = informer.Get(curr.GetName(), curr.GetNamespace())
+ if controllers.IsNil(curr) {
+ // this can happen when an immediate delete after update
+ // the delete event can be handled later
+ return nil
+ }
+ return handler(prev, curr, event)
+ }
+
+ informer.AddEventHandler(
+ controllers.EventHandler[T]{
+ AddFunc: func(obj T) {
+ c.queue.Push(func() error {
+ return wrappedHandler(ptr.Empty[T](),
obj, model.EventAdd)
+ })
+ },
+ UpdateFunc: func(old, cur T) {
+ if filter != nil {
+ if filter(old, cur) {
+ return
+ }
+ }
+ c.queue.Push(func() error {
+ return wrappedHandler(old, cur,
model.EventUpdate)
+ })
+ },
+ DeleteFunc: func(obj T) {
+ c.queue.Push(func() error {
+ return handler(ptr.Empty[T](), obj,
model.EventDelete)
+ })
+ },
+ })
+}
+
+func (c *Controller) servicesForNamespacedName(name types.NamespacedName)
[]*model.Service {
+ if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace,
c.opts.DomainSuffix)); svc != nil {
+ return []*model.Service{svc}
+ }
+ return nil
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
new file mode 100644
index 00000000..3e240cc9
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -0,0 +1,120 @@
+package controller
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "istio.io/api/annotation"
+ corev1 "k8s.io/api/core/v1"
+ v1 "k8s.io/api/discovery/v1"
+ klabels "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/selection"
+ mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
+ "strings"
+)
+
+var (
+ endpointSliceRequirement = labelRequirement(mcs.LabelServiceName,
selection.DoesNotExist, nil)
+ endpointSliceSelector =
klabels.NewSelector().Add(*endpointSliceRequirement)
+)
+
+type endpointSliceController struct {
+ slices kclient.Client[*v1.EndpointSlice]
+ c *Controller
+}
+
+func newEndpointSliceController(c *Controller) *endpointSliceController {
+ slices := kclient.NewFiltered[*v1.EndpointSlice](c.client,
kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
+ out := &endpointSliceController{
+ c: c,
+ slices: slices,
+ }
+ registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice",
out.onEvent, nil)
+ return out
+}
+
+func (esc *endpointSliceController) onEvent(_, ep *v1.EndpointSlice, event
model.Event) error {
+ esc.onEventInternal(nil, ep, event)
+ return nil
+}
+
+func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice,
event model.Event) {
+ esLabels := ep.GetLabels()
+ if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
+ return
+ }
+ // Update internal endpoint cache no matter what kind of service, even
headless service.
+ // As for gateways, the cluster discovery type is `EDS` for headless
service.
+ // namespacedName := getServiceNamespacedName(ep)
+ // log.Debugf("Handle EDS endpoint %s %s in namespace %s",
namespacedName.Name, event, namespacedName.Namespace)
+ // if event == model.EventDelete {
+ // esc.deleteEndpointSlice(ep)
+ // } else {
+ // esc.updateEndpointSlice(ep)
+ // }
+
+ // Now check if we need to do a full push for the service.
+ // If the service is headless, we need to do a full push if service
exposes TCP ports
+ // to create IP based listeners. For pure HTTP headless services, we
only need to push NDS.
+ name := serviceNameForEndpointSlice(esLabels)
+ namespace := ep.GetNamespace()
+ svc := esc.c.services.Get(name, namespace)
+ if svc != nil && !serviceNeedsPush(svc) {
+ return
+ }
+
+ // hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
+ // log.Debugf("triggering EDS push for %s in namespace %s", hostnames,
namespacedName.Namespace)
+ // Trigger EDS push for all hostnames.
+ // esc.pushEDS(hostnames, namespacedName.Namespace)
+
+ if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone ||
svc.Spec.Type == corev1.ServiceTypeExternalName {
+ return
+ }
+
+ configsUpdated := sets.New[model.ConfigKey]()
+ supportsOnlyHTTP := true
+ for _, modelSvc := range
esc.c.servicesForNamespacedName(config.NamespacedName(svc)) {
+ for _, p := range modelSvc.Ports {
+ if !p.Protocol.IsHTTP() {
+ supportsOnlyHTTP = false
+ break
+ }
+ }
+ if supportsOnlyHTTP {
+ // pure HTTP headless services should not need a full
push since they do not
+ // require a Listener based on IP:
https://github.com/istio/istio/issues/48207
+ configsUpdated.Insert(model.ConfigKey{Kind:
kind.DNSName, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
+ } else {
+ configsUpdated.Insert(model.ConfigKey{Kind:
kind.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
+ }
+ }
+
+ if len(configsUpdated) > 0 {
+ esc.c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
+ Full: true,
+ ConfigsUpdated: configsUpdated,
+ Reason:
model.NewReasonStats(model.HeadlessEndpointUpdate),
+ })
+ }
+}
+
+func serviceNameForEndpointSlice(labels map[string]string) string {
+ return labels[v1.LabelServiceName]
+}
+
+func serviceNeedsPush(svc *corev1.Service) bool {
+ if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
+ namespaces :=
strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
+ for _, ns := range namespaces {
+ ns = strings.TrimSpace(ns)
+ if ns == string(visibility.None) {
+ return false
+ }
+ }
+ }
+ return true
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/multicluster.go
b/sail/pkg/serviceregistry/kube/controller/multicluster.go
new file mode 100644
index 00000000..cad7a102
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/multicluster.go
@@ -0,0 +1,94 @@
+package controller
+
+import (
+ kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/server"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/serviceentry"
+ "k8s.io/klog/v2"
+)
+
+type kubeController struct {
+ MeshServiceController *aggregate.Controller
+ *Controller
+ workloadEntryController *serviceentry.Controller
+ stop chan struct{}
+}
+
+func (k *kubeController) Close() {
+ close(k.stop)
+}
+
+type Multicluster struct {
+ // serverID of this pilot instance used for leader election
+ serverID string
+
+ // options to use when creating kube controllers
+ opts Options
+
+ s server.Instance
+
+ clusterLocal model.ClusterLocalProvider
+
+ distributeCACert bool
+ caBundleWatcher *keycertbundle.Watcher
+ revision string
+
+ component *multicluster.Component[*kubeController]
+}
+
+func NewMulticluster(
+ serverID string,
+ opts Options,
+ caBundleWatcher *keycertbundle.Watcher,
+ revision string,
+ distributeCACert bool,
+ clusterLocal model.ClusterLocalProvider,
+ s server.Instance,
+ controller *multicluster.Controller,
+) *Multicluster {
+ mc := &Multicluster{
+ serverID: serverID,
+ opts: opts,
+ distributeCACert: distributeCACert,
+ caBundleWatcher: caBundleWatcher,
+ revision: revision,
+ clusterLocal: clusterLocal,
+ s: s,
+ }
+ mc.component = multicluster.BuildMultiClusterComponent(controller,
func(cluster *multicluster.Cluster) *kubeController {
+ stop := make(chan struct{})
+ client := cluster.Client
+ configCluster := opts.ClusterID == cluster.ID
+
+ options := opts
+ options.ClusterID = cluster.ID
+ klog.Infof("Initializing Kubernetes service registry %q",
options.ClusterID)
+ options.ConfigCluster = configCluster
+ kubeRegistry := NewController(client, options)
+ kubeController := &kubeController{
+ MeshServiceController: opts.MeshServiceController,
+ Controller: kubeRegistry,
+ stop: stop,
+ }
+ mc.initializeCluster(cluster, kubeController, kubeRegistry,
options, configCluster, stop)
+ return kubeController
+ })
+
+ return mc
+}
+
+func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster,
kubeController *kubeController, kubeRegistry *Controller,
+ options Options, configCluster bool, clusterStopCh <-chan struct{},
+) {
+ // run after WorkloadHandler is added
+ m.opts.MeshServiceController.AddRegistryAndRun(kubeRegistry,
clusterStopCh)
+}
+
+func (m *Multicluster) checkShouldLead(client kubelib.Client, systemNamespace
string, stop <-chan struct{}) bool {
+ var res bool
+ return res
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/util.go
b/sail/pkg/serviceregistry/kube/controller/util.go
new file mode 100644
index 00000000..f8c5ecaf
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/controller/util.go
@@ -0,0 +1,16 @@
+package controller
+
+import (
+ "fmt"
+ klabels "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/selection"
+ "k8s.io/apimachinery/pkg/util/validation/field"
+)
+
+func labelRequirement(key string, op selection.Operator, vals []string, opts
...field.PathOption) *klabels.Requirement {
+ out, err := klabels.NewRequirement(key, op, vals, opts...)
+ if err != nil {
+ panic(fmt.Sprintf("failed creating requirements for Service:
%v", err))
+ }
+ return out
+}
diff --git a/sail/pkg/serviceregistry/kube/conversion.go
b/sail/pkg/serviceregistry/kube/conversion.go
new file mode 100644
index 00000000..1d616bd1
--- /dev/null
+++ b/sail/pkg/serviceregistry/kube/conversion.go
@@ -0,0 +1,152 @@
+package kube
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "github.com/apache/dubbo-kubernetes/pkg/config/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/spiffe"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "istio.io/api/annotation"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ corev1 "k8s.io/api/core/v1"
+ "strings"
+)
+
+func ServiceHostname(name, namespace, domainSuffix string) host.Name {
+ return host.Name(name + "." + namespace + "." + "svc" + "." +
domainSuffix) // Format: "%s.%s.svc.%s"
+}
+
+func ConvertService(svc corev1.Service, domainSuffix string, clusterID
cluster.ID, mesh *meshconfig.MeshConfig) *model.Service {
+ addrs := []string{constants.UnspecifiedIP}
+ resolution := model.ClientSideLB
+ externalName := ""
+ nodeLocal := false
+
+ if svc.Spec.Type == corev1.ServiceTypeExternalName &&
svc.Spec.ExternalName != "" {
+ externalName = svc.Spec.ExternalName
+ resolution = model.Alias
+ }
+
+ if svc.Spec.InternalTrafficPolicy != nil &&
*svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal {
+ nodeLocal = true
+ }
+
+ if svc.Spec.ClusterIP == corev1.ClusterIPNone { // headless services
should not be load balanced
+ resolution = model.Passthrough
+ } else if svc.Spec.ClusterIP != "" {
+ addrs[0] = svc.Spec.ClusterIP
+ if len(svc.Spec.ClusterIPs) > 1 {
+ addrs = svc.Spec.ClusterIPs
+ }
+ }
+
+ ports := make([]*model.Port, 0, len(svc.Spec.Ports))
+ for _, port := range svc.Spec.Ports {
+ ports = append(ports, convertPort(port))
+ }
+ var exportTo sets.Set[visibility.Instance]
+ serviceaccounts := make([]string, 0)
+ if svc.Annotations[annotation.AlphaCanonicalServiceAccounts.Name] != ""
{
+ serviceaccounts = append(serviceaccounts,
strings.Split(svc.Annotations[annotation.AlphaCanonicalServiceAccounts.Name],
",")...)
+ }
+ if svc.Annotations[annotation.AlphaKubernetesServiceAccounts.Name] !=
"" {
+ for _, ksa := range
strings.Split(svc.Annotations[annotation.AlphaKubernetesServiceAccounts.Name],
",") {
+ serviceaccounts = append(serviceaccounts,
kubeToDubboServiceAccount(ksa, svc.Namespace, mesh))
+ }
+ }
+ if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
+ namespaces :=
strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
+ exportTo =
sets.NewWithLength[visibility.Instance](len(namespaces))
+ for _, ns := range namespaces {
+ ns = strings.TrimSpace(ns)
+ exportTo.Insert(visibility.Instance(ns))
+ }
+ }
+
+ dubboService := &model.Service{
+ Hostname: ServiceHostname(svc.Name, svc.Namespace,
domainSuffix),
+ ClusterVIPs: model.AddressMap{
+ Addresses: map[cluster.ID][]string{
+ clusterID: addrs,
+ },
+ },
+ Ports: ports,
+ DefaultAddress: addrs[0],
+ Resolution: resolution,
+ ServiceAccounts: serviceaccounts,
+ ResourceVersion: svc.ResourceVersion,
+ Attributes: model.ServiceAttributes{
+ ServiceRegistry: provider.Kubernetes,
+ Name: svc.Name,
+ Namespace: svc.Namespace,
+ Labels: svc.Labels,
+ ExportTo: exportTo,
+ LabelSelectors: svc.Spec.Selector,
+ },
+ }
+
+ switch svc.Spec.Type {
+ case corev1.ServiceTypeNodePort:
+ if _, ok :=
svc.Annotations[annotation.TrafficNodeSelector.Name]; !ok {
+ break
+ }
+ // store the service port to node port mappings
+ portMap := make(map[uint32]uint32)
+ for _, p := range svc.Spec.Ports {
+ portMap[uint32(p.Port)] = uint32(p.NodePort)
+ }
+ dubboService.Attributes.ClusterExternalPorts =
map[cluster.ID]map[uint32]uint32{clusterID: portMap}
+ // address mappings will be done elsewhere
+ case corev1.ServiceTypeLoadBalancer:
+ if len(svc.Status.LoadBalancer.Ingress) > 0 {
+ var lbAddrs []string
+ for _, ingress := range svc.Status.LoadBalancer.Ingress
{
+ if len(ingress.IP) > 0 {
+ lbAddrs = append(lbAddrs, ingress.IP)
+ } else if len(ingress.Hostname) > 0 {
+ // DO NOT resolve the DNS here. In
environments like AWS, the ELB hostname
+ // does not have a repeatable DNS
address and IPs resolved at an earlier point
+ // in time may not work. So, when we
get just hostnames instead of IPs, we need
+ // to smartly switch from EDS to
strict_dns rather than doing the naive thing of
+ // resolving the DNS name and hoping
the resolution is one-time task.
+ lbAddrs = append(lbAddrs,
ingress.Hostname)
+ }
+ }
+ if len(lbAddrs) > 0 {
+ if
dubboService.Attributes.ClusterExternalAddresses == nil {
+
dubboService.Attributes.ClusterExternalAddresses = &model.AddressMap{}
+ }
+
dubboService.Attributes.ClusterExternalAddresses.SetAddressesFor(clusterID,
lbAddrs)
+ }
+ }
+ }
+
+ dubboService.Attributes.Type = string(svc.Spec.Type)
+ dubboService.Attributes.ExternalName = externalName
+ dubboService.Attributes.TrafficDistribution =
model.GetTrafficDistribution(svc.Spec.TrafficDistribution, svc.Annotations)
+ dubboService.Attributes.NodeLocal = nodeLocal
+ dubboService.Attributes.PublishNotReadyAddresses =
svc.Spec.PublishNotReadyAddresses
+ if len(svc.Spec.ExternalIPs) > 0 {
+ if dubboService.Attributes.ClusterExternalAddresses == nil {
+ dubboService.Attributes.ClusterExternalAddresses =
&model.AddressMap{}
+ }
+
dubboService.Attributes.ClusterExternalAddresses.AddAddressesFor(clusterID,
svc.Spec.ExternalIPs)
+ }
+ return dubboService
+}
+
+func convertPort(port corev1.ServicePort) *model.Port {
+ return &model.Port{
+ Name: port.Name,
+ Port: int(port.Port),
+ Protocol: kube.ConvertProtocol(port.Port, port.Name,
port.Protocol, port.AppProtocol),
+ }
+}
+
+func kubeToDubboServiceAccount(saname string, ns string, mesh
*meshconfig.MeshConfig) string {
+ return spiffe.MustGenSpiffeURI(mesh, ns, saname)
+}
diff --git a/sail/pkg/serviceregistry/serviceentry/controller.go
b/sail/pkg/serviceregistry/serviceentry/controller.go
new file mode 100644
index 00000000..5d116e7c
--- /dev/null
+++ b/sail/pkg/serviceregistry/serviceentry/controller.go
@@ -0,0 +1,4 @@
+package serviceentry
+
+type Controller struct {
+}
diff --git a/sail/pkg/xds/eds.go b/sail/pkg/xds/eds.go
index 811dacd7..d2afda10 100644
--- a/sail/pkg/xds/eds.go
+++ b/sail/pkg/xds/eds.go
@@ -1,23 +1,12 @@
package xds
-import (
- "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
- "github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/sail/pkg/model"
-)
+import "github.com/apache/dubbo-kubernetes/sail/pkg/model"
-// TODO EDS
-func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string,
namespace string,
- dubboEndpoints []*model.DubboEndpoint,
-) {
- // Update the endpoint shards
- pushType := s.Env.EndpointIndex.UpdateServiceEndpoints(shard,
serviceName, namespace, dubboEndpoints, true)
- if pushType == model.IncrementalPush || pushType == model.FullPush {
- // Trigger a push
- s.ConfigUpdate(&model.PushRequest{
- Full: pushType == model.FullPush,
- ConfigsUpdated: sets.New(model.ConfigKey{Kind:
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
- Reason:
model.NewReasonStats(model.EndpointUpdate),
- })
+func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey, hostname string,
namespace string, event model.Event) {
+ // When a service deleted, we should cleanup the endpoint shards and
also remove keys from EndpointIndex to
+ // prevent memory leaks.
+ if event == model.EventDelete {
+ s.Env.EndpointIndex.DeleteServiceShard(shard, hostname,
namespace, false)
+ } else {
}
}