This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 7c0a5811 [sail] add init container (#790)
7c0a5811 is described below
commit 7c0a58111e750f692332f01fa4ef769889a8f966
Author: Jian Zhong <[email protected]>
AuthorDate: Sun Sep 21 15:41:39 2025 +0800
[sail] add init container (#790)
---
go.mod | 3 +-
go.sum | 2 +
pkg/adsc/adsc.go | 830 +++++++++++++++++++++
pkg/adsc/util.go | 57 ++
pkg/config/schema/collection/schemas.go | 232 ++++++
pkg/config/schema/collections/collections.go | 14 +
pkg/config/schema/resource/schema.go | 369 +++++++++
pkg/config/validation/validation.go | 15 +-
pkg/kube/collection/{collection.go => schemas.go} | 0
pkg/kube/krt/core.go | 6 +
pkg/kube/krt/debug.go | 15 +
pkg/kube/krt/files/files.go | 199 ++++-
pkg/kube/krt/index.go | 8 +
pkg/kube/krt/static.go | 251 +++++++
pkg/kube/krt/sync.go | 10 +
pkg/model/xds.go | 11 +
pkg/security/security.go | 36 +
pkg/wellknown/wellknown.go | 13 +
sail/cmd/sail-discovery/app/cmd.go | 6 +-
sail/pkg/bootstrap/configcontroller.go | 257 +++++++
sail/pkg/bootstrap/server.go | 67 +-
sail/pkg/bootstrap/servicecontroller.go | 62 ++
sail/pkg/bootstrap/util.go | 4 +-
sail/pkg/config/aggregate/config.go | 189 +++++
sail/pkg/config/kube/crd/config.go | 13 +
sail/pkg/config/kube/crd/conversion.go | 57 ++
sail/pkg/config/kube/file/controller.go | 192 +++++
sail/pkg/config/memory/controller.go | 140 ++++
sail/pkg/config/memory/monitor.go | 22 +
sail/pkg/config/memory/store.go | 250 +++++++
sail/pkg/credentials/kube/secrets.go | 87 +++
sail/pkg/credentials/model.go | 13 +
sail/pkg/model/config.go | 26 +
sail/pkg/model/context.go | 5 +-
sail/pkg/model/controller.go | 27 +
sail/pkg/model/push_context.go | 3 +
sail/pkg/model/service.go | 4 +
sail/pkg/networking/util/util.go | 6 +
sail/pkg/server/instance.go | 11 +
sail/pkg/serviceregistry/aggregate/controller.go | 55 ++
sail/pkg/serviceregistry/instance.go | 20 +
.../serviceregistry/kube/controller/controller.go | 22 +-
.../providers.go => provider/provider.go} | 2 +-
sail/pkg/xds/v3/model.go | 11 +
security/pkg/pki/ca/ca.go | 2 +-
45 files changed, 3587 insertions(+), 37 deletions(-)
diff --git a/go.mod b/go.mod
index d3038f32..5b397c0a 100644
--- a/go.mod
+++ b/go.mod
@@ -38,6 +38,7 @@ require (
github.com/go-jose/go-jose/v4 v4.0.5
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.4
+ github.com/google/go-cmp v0.7.0
github.com/google/go-containerregistry v0.20.6
github.com/hashicorp/go-multierror v1.1.1
github.com/heroku/color v0.0.6
@@ -71,6 +72,7 @@ require (
)
require (
+ cel.dev/expr v0.24.0 // indirect
dario.cat/mergo v1.0.2 // indirect
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c //
indirect
@@ -143,7 +145,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da //
indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
- github.com/google/go-cmp v0.7.0 // indirect
github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
diff --git a/go.sum b/go.sum
index 54d2b529..c0bd9a31 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,5 @@
+cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
+cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
cloud.google.com/go v0.26.0/go.mod
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.114.0 h1:OIPFAdfrFDFO2ve2U7r/H5SwSbBzEdrBdE7xkgwc+kY=
cloud.google.com/go v0.114.0/go.mod
h1:ZV9La5YYxctro1HTPug5lXH/GefROyW8PPD4T8n9J8E=
diff --git a/pkg/adsc/adsc.go b/pkg/adsc/adsc.go
new file mode 100644
index 00000000..16282d67
--- /dev/null
+++ b/pkg/adsc/adsc.go
@@ -0,0 +1,830 @@
+package adsc
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/backoff"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
+ "github.com/apache/dubbo-kubernetes/pkg/wellknown"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
+ v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+ cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+ core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ endpoint
"github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+ listener
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+ route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/proto"
+ anypb "google.golang.org/protobuf/types/known/anypb"
+ pstruct "google.golang.org/protobuf/types/known/structpb"
+ mcp "istio.io/api/mcp/v1alpha1"
+ "istio.io/api/mesh/v1alpha1"
+ "k8s.io/klog/v2"
+ "math"
+ "net"
+ "os"
+ "strings"
+ "sync"
+ "time"
+)
+
+const (
+ defaultClientMaxReceiveMessageSize = math.MaxInt32
+ defaultInitialConnWindowSize = 1024 * 1024 // default gRPC
InitialWindowSize
+ defaultInitialWindowSize = 1024 * 1024 // default gRPC
ConnWindowSize
+)
+
+type Config struct {
+ // Address of the xDS server
+ Address string
+ // Namespace defaults to 'default'
+ Namespace string
+ // CertDir is the directory where mTLS certs are configured.
+ // If CertDir and Secret are empty, an insecure connection will be used.
+ // TODO: implement SecretManager for cert dir
+ CertDir string
+ // Secrets is the interface used for getting keys and rootCA.
+ SecretManager security.SecretManager
+ GrpcOpts []grpc.DialOption
+ // XDSRootCAFile explicitly set the root CA to be used for the XDS
connection.
+ // Mirrors Envoy file.
+ XDSRootCAFile string
+ // XDSSAN is the expected SAN of the XDS server. If not set, the
ProxyConfig.DiscoveryAddress is used.
+ XDSSAN string
+ // InsecureSkipVerify skips client verification the server's
certificate chain and host name.
+ InsecureSkipVerify bool
+ // Workload defaults to 'test'
+ Workload string
+ // Revision for this control plane instance. We will only read configs
that match this revision.
+ Revision string
+ // Meta includes additional metadata for the node
+ Meta *pstruct.Struct
+ // IP is currently the primary key used to locate inbound configs. It
is sent by client,
+ // must match a known endpoint IP. Tests can use a ServiceEntry to
register fake IPs.
+ IP string
+ Locality *core.Locality
+ // BackoffPolicy determines the reconnect policy. Based on MCP client.
+ BackoffPolicy backoff.BackOff
+}
+
+// ADSC implements a basic client for ADS, for use in stress tests and tools
+// or libraries that need to connect to Istio pilot or other ADS servers.
+type ADSC struct {
+ // Stream is the GRPC connection stream, allowing direct GRPC send
operations.
+ // Set after Dial is called.
+ stream
discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
+ // xds client used to create a stream
+ client discovery.AggregatedDiscoveryServiceClient
+ conn *grpc.ClientConn
+ // Indicates if the ADSC client is closed
+ closed bool
+ watchTime time.Time
+ // Updates includes the type of the last update received from the
server.
+ Updates chan string
+ errChan chan error
+ XDSUpdates chan *discovery.DiscoveryResponse
+ VersionInfo map[string]string
+
+ // Last received message, by type
+ Received map[string]*discovery.DiscoveryResponse
+
+ mutex sync.RWMutex
+
+ Mesh *v1alpha1.MeshConfig
+
+ // Retrieved configurations can be stored using the common istio model
interface.
+ Store model.ConfigStore
+ cfg *ADSConfig
+ // sendNodeMeta is set to true if the connection is new - and we need
to send node meta.,
+ sendNodeMeta bool
+ // initialLoad tracks the time to receive the initial configuration.
+ initialLoad time.Duration
+ // indicates if the initial LDS request is sent
+ initialLds bool
+ // httpListeners contains received listeners with a
http_connection_manager filter.
+ httpListeners map[string]*listener.Listener
+ // tcpListeners contains all listeners of type TCP (not-HTTP)
+ tcpListeners map[string]*listener.Listener
+ // All received clusters of type eds, keyed by name
+ edsClusters map[string]*cluster.Cluster
+ // All received clusters of no-eds type, keyed by name
+ clusters map[string]*cluster.Cluster
+ // All received routes, keyed by route name
+ routes map[string]*route.RouteConfiguration
+ // All received endpoints, keyed by cluster name
+ eds map[string]*endpoint.ClusterLoadAssignment
+ sync map[string]time.Time
+}
+
+type ResponseHandler interface {
+ HandleResponse(con *ADSC, response *discovery.DiscoveryResponse)
+}
+
+// ADSConfig for the ADS connection.
+type ADSConfig struct {
+ Config
+
+ // InitialDiscoveryRequests is a list of resources to watch at first,
represented as URLs (for new XDS resource naming)
+ // or type URLs.
+ InitialDiscoveryRequests []*discovery.DiscoveryRequest
+
+ // ResponseHandler will be called on each DiscoveryResponse.
+ // TODO: mirror Generator, allow adding handler per type
+ ResponseHandler ResponseHandler
+}
+
+// New creates a new ADSC, maintaining a connection to an XDS server.
+// Will:
+// - get certificate using the Secret provider, if CertRequired
+// - connect to the XDS server specified in ProxyConfig
+// - send initial request for watched resources
+// - wait for response from XDS server
+// - on success, start a background thread to maintain the connection, with
exp. backoff.
+func New(discoveryAddr string, opts *ADSConfig) (*ADSC, error) {
+ if opts == nil {
+ opts = &ADSConfig{}
+ }
+ opts.Config = setDefaultConfig(&opts.Config)
+ opts.Address = discoveryAddr
+ adsc := &ADSC{
+ Updates: make(chan string, 100),
+ XDSUpdates: make(chan *discovery.DiscoveryResponse, 100),
+ VersionInfo: map[string]string{},
+ Received: map[string]*discovery.DiscoveryResponse{},
+ cfg: opts,
+ sync: map[string]time.Time{},
+ errChan: make(chan error, 10),
+ }
+ if err := adsc.Dial(); err != nil {
+ return nil, err
+ }
+
+ return adsc, nil
+}
+
+func defaultGrpcDialOptions() []grpc.DialOption {
+ return []grpc.DialOption{
+ // TODO(SpecialYang) maybe need to make it configurable.
+ grpc.WithInitialWindowSize(int32(defaultInitialWindowSize)),
+
grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize)),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)),
+ }
+}
+
+func (a *ADSC) node() *core.Node {
+ return buildNode(&a.cfg.Config)
+}
+
+func buildNode(config *Config) *core.Node {
+ n := &core.Node{
+ // Id: nodeID(config),
+ Locality: config.Locality,
+ }
+ if config.Meta == nil {
+ n.Metadata = &pstruct.Struct{
+ Fields: map[string]*pstruct.Value{
+ "ISTIO_VERSION": {Kind:
&pstruct.Value_StringValue{StringValue: "65536.65536.65536"}},
+ },
+ }
+ } else {
+ n.Metadata = config.Meta
+ if config.Meta.Fields["ISTIO_VERSION"] == nil {
+ config.Meta.Fields["ISTIO_VERSION"] =
&pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue:
"65536.65536.65536"}}
+ }
+ }
+ return n
+}
+
+func (a *ADSC) handleRecv() {
+ // We connected, so reset the backoff
+ if a.cfg.BackoffPolicy != nil {
+ a.cfg.BackoffPolicy.Reset()
+ }
+ for {
+ var err error
+ msg, err := a.stream.Recv()
+ if err != nil {
+ klog.Errorf("connection closed with err: %v", err)
+ select {
+ case a.errChan <- err:
+ default:
+ }
+ // if 'reconnect' enabled - schedule a new Run
+ if a.cfg.BackoffPolicy != nil {
+
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
+ } else {
+ a.Close()
+ a.WaitClear()
+ a.Updates <- ""
+ a.XDSUpdates <- nil
+ close(a.errChan)
+ }
+ return
+ }
+
+ // Group-value-kind - used for high level api generator.
+ resourceGvk, isMCP := convertTypeURLToMCPGVK(msg.TypeUrl)
+
+ // TODO WithLabels
+ if a.cfg.ResponseHandler != nil {
+ a.cfg.ResponseHandler.HandleResponse(a, msg)
+ }
+
+ if msg.TypeUrl == gvk.MeshConfig.String() &&
+ len(msg.Resources) > 0 {
+ rsc := msg.Resources[0]
+ m := &v1alpha1.MeshConfig{}
+ err = proto.Unmarshal(rsc.Value, m)
+ if err != nil {
+ klog.Errorf("Failed to unmarshal mesh config:
%v", err)
+ }
+ a.Mesh = m
+ continue
+ }
+
+ // Process the resources.
+ a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
+ switch msg.TypeUrl {
+ case v3.ListenerType:
+ listeners := make([]*listener.Listener, 0,
len(msg.Resources))
+ for _, rsc := range msg.Resources {
+ valBytes := rsc.Value
+ ll := &listener.Listener{}
+ _ = proto.Unmarshal(valBytes, ll)
+ listeners = append(listeners, ll)
+ }
+ a.handleLDS(listeners)
+ case v3.ClusterType:
+ clusters := make([]*cluster.Cluster, 0,
len(msg.Resources))
+ for _, rsc := range msg.Resources {
+ valBytes := rsc.Value
+ cl := &cluster.Cluster{}
+ _ = proto.Unmarshal(valBytes, cl)
+ clusters = append(clusters, cl)
+ }
+ a.handleCDS(clusters)
+ case v3.EndpointType:
+ eds := make([]*endpoint.ClusterLoadAssignment, 0,
len(msg.Resources))
+ for _, rsc := range msg.Resources {
+ valBytes := rsc.Value
+ el := &endpoint.ClusterLoadAssignment{}
+ _ = proto.Unmarshal(valBytes, el)
+ eds = append(eds, el)
+ }
+ a.handleEDS(eds)
+ case v3.RouteType:
+ routes := make([]*route.RouteConfiguration, 0,
len(msg.Resources))
+ for _, rsc := range msg.Resources {
+ valBytes := rsc.Value
+ rl := &route.RouteConfiguration{}
+ _ = proto.Unmarshal(valBytes, rl)
+ routes = append(routes, rl)
+ }
+ a.handleRDS(routes)
+ default:
+ if isMCP {
+ a.handleMCP(resourceGvk, msg.Resources)
+ }
+ }
+
+ // If we got no resource - still save to the store with empty
name/namespace, to notify sync
+ // This scheme also allows us to chunk large responses !
+
+ // TODO: add hook to inject nacks
+
+ a.mutex.Lock()
+ if isMCP {
+ if _, exist := a.sync[resourceGvk.String()]; !exist {
+ a.sync[resourceGvk.String()] = time.Now()
+ }
+ }
+ a.Received[msg.TypeUrl] = msg
+ a.ack(msg)
+ a.mutex.Unlock()
+
+ select {
+ case a.XDSUpdates <- msg:
+ default:
+ }
+ }
+}
+
+// WaitClear will clear the waiting events, so next call to Wait will get
+// the next push type.
+func (a *ADSC) WaitClear() {
+ for {
+ select {
+ case <-a.Updates:
+ default:
+ return
+ }
+ }
+}
+
+// Dial connects to a ADS server, with optional MTLS authentication if a cert
dir is specified.
+func (a *ADSC) Dial() error {
+ conn, err := dialWithConfig(context.Background(), &a.cfg.Config)
+ if err != nil {
+ return err
+ }
+ a.conn = conn
+ return nil
+}
+
+// Raw send of a request.
+func (a *ADSC) Send(req *discovery.DiscoveryRequest) error {
+ if a.sendNodeMeta {
+ req.Node = a.node()
+ a.sendNodeMeta = false
+ }
+ req.ResponseNonce = time.Now().String()
+ // if adscLog.DebugEnabled() {
+ // strReq, _ := protomarshal.ToJSONWithIndent(req, " ")
+ // adscLog.Debugf("Sending Discovery Request to istiod: %s",
strReq)
+ // }
+ return a.stream.Send(req)
+}
+
+// HasSynced returns true if MCP configs have synced
+func (a *ADSC) HasSynced() bool {
+ if a.cfg == nil || len(a.cfg.InitialDiscoveryRequests) == 0 {
+ return true
+ }
+
+ a.mutex.RLock()
+ defer a.mutex.RUnlock()
+
+ for _, req := range a.cfg.InitialDiscoveryRequests {
+ _, isMCP := convertTypeURLToMCPGVK(req.TypeUrl)
+ if !isMCP {
+ continue
+ }
+
+ if _, ok := a.sync[req.TypeUrl]; !ok {
+ return false
+ }
+ }
+
+ return true
+}
+
+// Run will create a new stream using the existing grpc client connection and
send the initial xds requests.
+// And then it will run a go routine receiving and handling xds response.
+// Note: it is non blocking
+func (a *ADSC) Run() error {
+ var err error
+ a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
+ a.stream, err = a.client.StreamAggregatedResources(context.Background())
+ if err != nil {
+ return err
+ }
+ a.sendNodeMeta = true
+ a.initialLoad = 0
+ a.initialLds = false
+ // Send the initial requests
+ for _, r := range a.cfg.InitialDiscoveryRequests {
+ if r.TypeUrl == v3.ClusterType {
+ a.watchTime = time.Now()
+ }
+ _ = a.Send(r)
+ }
+
+ go a.handleRecv()
+ return nil
+}
+
+// Close the stream.
+func (a *ADSC) Close() {
+ a.mutex.Lock()
+ _ = a.conn.Close()
+ a.closed = true
+ a.mutex.Unlock()
+}
+
+func setDefaultConfig(config *Config) Config {
+ if config == nil {
+ config = &Config{}
+ }
+ if config.Namespace == "" {
+ config.Namespace = "default"
+ }
+ return *config
+}
+
+func dialWithConfig(ctx context.Context, config *Config) (*grpc.ClientConn,
error) {
+ defaultGrpcDialOptions := defaultGrpcDialOptions()
+ var grpcDialOptions []grpc.DialOption
+ grpcDialOptions = append(grpcDialOptions, defaultGrpcDialOptions...)
+ grpcDialOptions = append(grpcDialOptions, config.GrpcOpts...)
+
+ var err error
+ // If we need MTLS - CertDir or Secrets provider is set.
+ if len(config.CertDir) > 0 || config.SecretManager != nil {
+ tlsCfg, err := tlsConfig(config)
+ if err != nil {
+ return nil, err
+ }
+ creds := credentials.NewTLS(tlsCfg)
+ grpcDialOptions = append(grpcDialOptions,
grpc.WithTransportCredentials(creds))
+ }
+
+ if len(grpcDialOptions) == len(defaultGrpcDialOptions) {
+ // Only disable transport security if the user didn't supply
custom dial options
+ grpcDialOptions = append(grpcDialOptions,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ }
+
+ conn, err := grpc.DialContext(ctx, config.Address, grpcDialOptions...)
+ if err != nil {
+ return nil, err
+ }
+ return conn, nil
+}
+
+func tlsConfig(config *Config) (*tls.Config, error) {
+ var serverCABytes []byte
+ var err error
+
+ getClientCertificate := getClientCertFn(config)
+
+ // Load the root CAs
+ if config.XDSRootCAFile != "" {
+ serverCABytes, err = os.ReadFile(config.XDSRootCAFile)
+ if err != nil {
+ return nil, err
+ }
+ } else if config.SecretManager != nil {
+ // This is a bit crazy - we could just use the file
+ rootCA, err :=
config.SecretManager.GenerateSecret(security.RootCertReqResourceName)
+ if err != nil {
+ return nil, err
+ }
+
+ serverCABytes = rootCA.RootCert
+ } else if config.CertDir != "" {
+ serverCABytes, err = os.ReadFile(config.CertDir +
"/root-cert.pem")
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ serverCAs := x509.NewCertPool()
+ if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok {
+ return nil, err
+ }
+
+ shost, _, _ := net.SplitHostPort(config.Address)
+ if config.XDSSAN != "" {
+ shost = config.XDSSAN
+ }
+
+ // nolint: gosec
+ // it's insecure only when a user explicitly enable insecure mode.
+ return &tls.Config{
+ GetClientCertificate: getClientCertificate,
+ RootCAs: serverCAs,
+ ServerName: shost,
+ InsecureSkipVerify: config.InsecureSkipVerify,
+ }, nil
+}
+
+func ConfigInitialRequests() []*discovery.DiscoveryRequest {
+ out := make([]*discovery.DiscoveryRequest, 0,
len(collections.Sail.All())+1)
+ out = append(out, &discovery.DiscoveryRequest{
+ TypeUrl: gvk.MeshConfig.String(),
+ })
+ for _, sch := range collections.Sail.All() {
+ out = append(out, &discovery.DiscoveryRequest{
+ TypeUrl: sch.GroupVersionKind().String(),
+ })
+ }
+
+ return out
+}
+
+// reconnect will create a new stream
+func (a *ADSC) reconnect() {
+ a.mutex.RLock()
+ if a.closed {
+ a.mutex.RUnlock()
+ return
+ }
+ a.mutex.RUnlock()
+
+ err := a.Run()
+ if err != nil {
+ time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
+ }
+}
+
+func (a *ADSC) ack(msg *discovery.DiscoveryResponse) {
+ var resources []string
+
+ if strings.HasPrefix(msg.TypeUrl, v3.DebugType) {
+ // If the response is for istio.io/debug or istio.io/debug/*,
+ // skip to send ACK.
+ return
+ }
+
+ if msg.TypeUrl == v3.EndpointType {
+ for c := range a.edsClusters {
+ resources = append(resources, c)
+ }
+ }
+ if msg.TypeUrl == v3.RouteType {
+ for r := range a.routes {
+ resources = append(resources, r)
+ }
+ }
+
+ _ = a.stream.Send(&discovery.DiscoveryRequest{
+ ResponseNonce: msg.Nonce,
+ TypeUrl: msg.TypeUrl,
+ Node: a.node(),
+ VersionInfo: msg.VersionInfo,
+ ResourceNames: resources,
+ })
+}
+
+func (a *ADSC) handleMCP(groupVersionKind config.GroupVersionKind, resources
[]*anypb.Any) {
+ // Generic - fill up the store
+ if a.Store == nil {
+ return
+ }
+
+ existingConfigs := a.Store.List(groupVersionKind, "")
+
+ received := make(map[string]*config.Config)
+ for _, rsc := range resources {
+ m := &mcp.Resource{}
+ err := rsc.UnmarshalTo(m)
+ if err != nil {
+ klog.Errorf("Error unmarshalling received MCP config
%v", err)
+ continue
+ }
+ newCfg, err := a.mcpToSail(m)
+ if err != nil {
+ klog.Errorf("Invalid data: %v (%v)", err,
string(rsc.Value))
+ continue
+ }
+ if newCfg == nil {
+ continue
+ }
+ received[newCfg.Namespace+"/"+newCfg.Name] = newCfg
+
+ newCfg.GroupVersionKind = groupVersionKind
+ oldCfg := a.Store.Get(newCfg.GroupVersionKind, newCfg.Name,
newCfg.Namespace)
+
+ if oldCfg == nil {
+ if _, err = a.Store.Create(*newCfg); err != nil {
+ klog.Errorf("Error adding a new resource to the
store %v", err)
+ continue
+ }
+ } else if oldCfg.ResourceVersion != newCfg.ResourceVersion ||
newCfg.ResourceVersion == "" {
+ // update the store only when resource version differs
or unset.
+ // newCfg.Annotations[mem.ResourceVersion] =
newCfg.ResourceVersion
+ newCfg.ResourceVersion = oldCfg.ResourceVersion
+ if _, err = a.Store.Update(*newCfg); err != nil {
+ klog.Errorf("Error updating an existing
resource in the store %v", err)
+ continue
+ }
+ }
+ }
+
+ // remove deleted resources from cache
+ for _, config := range existingConfigs {
+ if _, ok := received[config.Namespace+"/"+config.Name]; !ok {
+ if err := a.Store.Delete(config.GroupVersionKind,
config.Name, config.Namespace, nil); err != nil {
+ klog.Errorf("Error deleting an outdated
resource from the store %v", err)
+ continue
+ }
+ }
+ }
+}
+
+func (a *ADSC) mcpToSail(m *mcp.Resource) (*config.Config, error) {
+ if m == nil || m.Metadata == nil {
+ return &config.Config{}, nil
+ }
+ c := &config.Config{
+ Meta: config.Meta{
+ ResourceVersion: m.Metadata.Version,
+ Labels: m.Metadata.Labels,
+ Annotations: m.Metadata.Annotations,
+ },
+ }
+
+ if !config.ObjectInRevision(c, a.cfg.Revision) { // In case upstream
does not support rev in node meta.
+ return nil, nil
+ }
+
+ if c.Meta.Annotations == nil {
+ c.Meta.Annotations = make(map[string]string)
+ }
+ nsn := strings.Split(m.Metadata.Name, "/")
+ if len(nsn) != 2 {
+ return nil, fmt.Errorf("invalid name %s", m.Metadata.Name)
+ }
+ c.Namespace = nsn[0]
+ c.Name = nsn[1]
+ var err error
+ c.CreationTimestamp = m.Metadata.CreateTime.AsTime()
+
+ pb, err := m.Body.UnmarshalNew()
+ if err != nil {
+ return nil, err
+ }
+ c.Spec = pb
+ return c, nil
+}
+
+func (a *ADSC) handleCDS(ll []*cluster.Cluster) {
+ cn := make([]string, 0, len(ll))
+ cdsSize := 0
+ edscds := map[string]*cluster.Cluster{}
+ cds := map[string]*cluster.Cluster{}
+ for _, c := range ll {
+ cdsSize += proto.Size(c)
+ switch v := c.ClusterDiscoveryType.(type) {
+ case *cluster.Cluster_Type:
+ if v.Type != cluster.Cluster_EDS {
+ cds[c.Name] = c
+ continue
+ }
+ }
+ cn = append(cn, c.Name)
+ edscds[c.Name] = c
+ }
+
+ klog.Infof("CDS: %d size=%d", len(cn), cdsSize)
+
+ if len(cn) > 0 {
+ a.sendRsc(v3.EndpointType, cn)
+ }
+
+ a.mutex.Lock()
+ defer a.mutex.Unlock()
+ a.edsClusters = edscds
+ a.clusters = cds
+
+ select {
+ case a.Updates <- v3.ClusterType:
+ default:
+ }
+}
+
+func (a *ADSC) handleEDS(eds []*endpoint.ClusterLoadAssignment) {
+ la := map[string]*endpoint.ClusterLoadAssignment{}
+ edsSize := 0
+ ep := 0
+ for _, cla := range eds {
+ edsSize += proto.Size(cla)
+ la[cla.ClusterName] = cla
+ ep += len(cla.Endpoints)
+ }
+
+ klog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep)
+ if a.initialLoad == 0 && !a.initialLds {
+ // first load - Envoy loads listeners after endpoints
+ _ = a.stream.Send(&discovery.DiscoveryRequest{
+ Node: a.node(),
+ TypeUrl: v3.ListenerType,
+ })
+ a.initialLds = true
+ }
+
+ a.mutex.Lock()
+ defer a.mutex.Unlock()
+ a.eds = la
+
+ select {
+ case a.Updates <- v3.EndpointType:
+ default:
+ }
+}
+
+// nolint: staticcheck
+func (a *ADSC) handleLDS(ll []*listener.Listener) {
+ lh := map[string]*listener.Listener{}
+ lt := map[string]*listener.Listener{}
+
+ routes := []string{}
+ ldsSize := 0
+
+ for _, l := range ll {
+ ldsSize += proto.Size(l)
+
+ // The last filter is the actual destination for inbound
listener
+ if l.ApiListener != nil {
+ // This is an API Listener
+ // TODO: extract VIP and RDS or cluster
+ continue
+ }
+ fc := l.FilterChains[len(l.FilterChains)-1]
+ // Find the terminal filter
+ filter := fc.Filters[len(fc.Filters)-1]
+
+ // The actual destination will be the next to the last if the
last filter is a passthrough filter
+ if fc.GetName() == util.PassthroughFilterChain {
+ fc = l.FilterChains[len(l.FilterChains)-2]
+ filter = fc.Filters[len(fc.Filters)-1]
+ }
+
+ switch filter.Name {
+ case wellknown.TCPProxy:
+ lt[l.Name] = l
+ config, _ :=
protomarshal.MessageToStructSlow(filter.GetTypedConfig())
+ c := config.Fields["cluster"].GetStringValue()
+ klog.V(2).Infof("TCP: %s -> %s", l.Name, c)
+ case wellknown.HTTPConnectionManager:
+ lh[l.Name] = l
+
+ // Getting from config is too painful..
+ port := l.Address.GetSocketAddress().GetPortValue()
+ if port == 15002 {
+ routes = append(routes, "http_proxy")
+ } else {
+ routes = append(routes, fmt.Sprintf("%d", port))
+ }
+ default:
+ klog.Infof(protomarshal.ToJSONWithIndent(l, " "))
+ }
+ }
+
+ klog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize)
+
+ a.mutex.Lock()
+ defer a.mutex.Unlock()
+ if len(routes) > 0 {
+ a.sendRsc(v3.RouteType, routes)
+ }
+ a.httpListeners = lh
+ a.tcpListeners = lt
+
+ select {
+ case a.Updates <- v3.ListenerType:
+ default:
+ }
+}
+
+func (a *ADSC) sendRsc(typeurl string, rsc []string) {
+ ex := a.Received[typeurl]
+ version := ""
+ nonce := ""
+ if ex != nil {
+ version = ex.VersionInfo
+ nonce = ex.Nonce
+ }
+ _ = a.stream.Send(&discovery.DiscoveryRequest{
+ ResponseNonce: nonce,
+ VersionInfo: version,
+ Node: a.node(),
+ TypeUrl: typeurl,
+ ResourceNames: rsc,
+ })
+}
+
+func (a *ADSC) handleRDS(configurations []*route.RouteConfiguration) {
+ vh := 0
+ rcount := 0
+ size := 0
+
+ rds := map[string]*route.RouteConfiguration{}
+
+ for _, r := range configurations {
+ for _, h := range r.VirtualHosts {
+ vh++
+ for _, rt := range h.Routes {
+ rcount++
+ // Example: match:<prefix:"/" >
route:<cluster:"outbound|9154||load-se-154.local" ...
+ klog.V(2).Infof("Handle route %v, path %v,
cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster())
+ }
+ }
+ rds[r.Name] = r
+ size += proto.Size(r)
+ }
+ if a.initialLoad == 0 {
+ a.initialLoad = time.Since(a.watchTime)
+ klog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d",
len(configurations), size, vh, rcount, a.initialLoad)
+ } else {
+ klog.Infof("RDS: %d size=%d vhosts=%d routes=%d",
len(configurations), size, vh, rcount)
+ }
+
+ a.mutex.Lock()
+ a.routes = rds
+ a.mutex.Unlock()
+
+ select {
+ case a.Updates <- v3.RouteType:
+ default:
+ }
+}
diff --git a/pkg/adsc/util.go b/pkg/adsc/util.go
new file mode 100644
index 00000000..48945b87
--- /dev/null
+++ b/pkg/adsc/util.go
@@ -0,0 +1,57 @@
+package adsc
+
+import (
+ "crypto/tls"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "strings"
+)
+
+func getClientCertFn(config *Config) func(requestInfo
*tls.CertificateRequestInfo) (*tls.Certificate, error) {
+ if config.SecretManager != nil {
+ return func(requestInfo *tls.CertificateRequestInfo)
(*tls.Certificate, error) {
+ key, err :=
config.SecretManager.GenerateSecret(security.WorkloadKeyCertResourceName)
+ if err != nil {
+ return nil, err
+ }
+ clientCert, err :=
tls.X509KeyPair(key.CertificateChain, key.PrivateKey)
+ if err != nil {
+ return nil, err
+ }
+ return &clientCert, nil
+ }
+ }
+ if config.CertDir != "" {
+ return func(requestInfo *tls.CertificateRequestInfo)
(*tls.Certificate, error) {
+ certName := config.CertDir + "/cert-chain.pem"
+ clientCert, err := tls.LoadX509KeyPair(certName,
config.CertDir+"/key.pem")
+ if err != nil {
+ return nil, err
+ }
+ return &clientCert, nil
+ }
+ }
+
+ return nil
+}
+
+func convertTypeURLToMCPGVK(typeURL string) (config.GroupVersionKind, bool) {
+ parts := strings.SplitN(typeURL, "/", 3)
+ if len(parts) != 3 {
+ return config.GroupVersionKind{}, false
+ }
+
+ gvk := config.GroupVersionKind{
+ Group: parts[0],
+ Version: parts[1],
+ Kind: parts[2],
+ }
+
+ _, isMCP := collections.Sail.FindByGroupVersionKind(gvk)
+ if isMCP {
+ return gvk, true
+ }
+
+ return config.GroupVersionKind{}, false
+}
diff --git a/pkg/config/schema/collection/schemas.go
b/pkg/config/schema/collection/schemas.go
new file mode 100644
index 00000000..bbf9e618
--- /dev/null
+++ b/pkg/config/schema/collection/schemas.go
@@ -0,0 +1,232 @@
+package collection
+
+import (
+ "fmt"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/hashicorp/go-multierror"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+)
+
+// Schemas contains metadata about configuration resources.
+type Schemas struct {
+ byCollection map[config.GroupVersionKind]resource.Schema
+ byAddOrder []resource.Schema
+}
+
+// SchemasFor is a shortcut for creating Schemas. It uses MustAdd for each
element.
+func SchemasFor(schemas ...resource.Schema) Schemas {
+ b := NewSchemasBuilder()
+ for _, s := range schemas {
+ b.MustAdd(s)
+ }
+ return b.Build()
+}
+
+// SchemasBuilder is a builder for the schemas type.
+type SchemasBuilder struct {
+ schemas Schemas
+}
+
+// NewSchemasBuilder returns a new instance of SchemasBuilder.
+func NewSchemasBuilder() *SchemasBuilder {
+ s := Schemas{
+ byCollection: make(map[config.GroupVersionKind]resource.Schema),
+ }
+
+ return &SchemasBuilder{
+ schemas: s,
+ }
+}
+
+// Add a new collection to the schemas.
+func (b *SchemasBuilder) Add(s resource.Schema) error {
+ if _, found := b.schemas.byCollection[s.GroupVersionKind()]; found {
+ return fmt.Errorf("collection already exists: %v",
s.GroupVersionKind())
+ }
+
+ b.schemas.byCollection[s.GroupVersionKind()] = s
+ b.schemas.byAddOrder = append(b.schemas.byAddOrder, s)
+ return nil
+}
+
+// MustAdd calls Add and panics if it fails.
+func (b *SchemasBuilder) MustAdd(s resource.Schema) *SchemasBuilder {
+ if err := b.Add(s); err != nil {
+ panic(fmt.Sprintf("SchemasBuilder.MustAdd: %v", err))
+ }
+ return b
+}
+
+// Build a new schemas from this SchemasBuilder.
+func (b *SchemasBuilder) Build() Schemas {
+ s := b.schemas
+
+ // Avoid modify after Build.
+ b.schemas = Schemas{}
+
+ return s
+}
+
+// ForEach executes the given function on each contained schema, until the
function returns true.
+func (s Schemas) ForEach(handleSchema func(resource.Schema) (done bool)) {
+ for _, schema := range s.byAddOrder {
+ if handleSchema(schema) {
+ return
+ }
+ }
+}
+
+func (s Schemas) Union(otherSchemas Schemas) Schemas {
+ resultBuilder := NewSchemasBuilder()
+ for _, myschema := range s.All() {
+ // an error indicates the schema has already been added, which
doesn't negatively impact intersect
+ _ = resultBuilder.Add(myschema)
+ }
+ for _, myschema := range otherSchemas.All() {
+ // an error indicates the schema has already been added, which
doesn't negatively impact intersect
+ _ = resultBuilder.Add(myschema)
+ }
+ return resultBuilder.Build()
+}
+
+func (s Schemas) Intersect(otherSchemas Schemas) Schemas {
+ resultBuilder := NewSchemasBuilder()
+
+ schemaLookup := sets.String{}
+ for _, myschema := range s.All() {
+ schemaLookup.Insert(myschema.String())
+ }
+
+ // Only add schemas that are in both sets
+ for _, myschema := range otherSchemas.All() {
+ if schemaLookup.Contains(myschema.String()) {
+ _ = resultBuilder.Add(myschema)
+ }
+ }
+ return resultBuilder.Build()
+}
+
+// FindByGroupVersionKind searches and returns the first schema with the given
GVK
+func (s Schemas) FindByGroupVersionKind(gvk config.GroupVersionKind)
(resource.Schema, bool) {
+ for _, rs := range s.byAddOrder {
+ if rs.GroupVersionKind() == gvk {
+ return rs, true
+ }
+ }
+
+ return nil, false
+}
+
+// FindByGroupVersionAliasesKind searches and returns the first schema with
the given GVK,
+// if not found, it will search for version aliases for the schema to see if
there is a match.
+func (s Schemas) FindByGroupVersionAliasesKind(gvk config.GroupVersionKind)
(resource.Schema, bool) {
+ for _, rs := range s.byAddOrder {
+ for _, va := range rs.GroupVersionAliasKinds() {
+ if va == gvk {
+ return rs, true
+ }
+ }
+ }
+ return nil, false
+}
+
+// FindByGroupKind searches and returns the first schema with the given GVK,
ignoring versions.
+// Generally it's a good idea to use FindByGroupVersionAliasesKind, which
validates the version as well.
+// FindByGroupKind provides future proofing against versions we don't yet know
about; given we don't know them, its risky.
+func (s Schemas) FindByGroupKind(gvk config.GroupVersionKind)
(resource.Schema, bool) {
+ for _, rs := range s.byAddOrder {
+ if rs.Group() == gvk.Group && rs.Kind() == gvk.Kind {
+ return rs, true
+ }
+ }
+ return nil, false
+}
+
+// FindByGroupVersionResource searches and returns the first schema with the
given GVR
+func (s Schemas) FindByGroupVersionResource(gvr schema.GroupVersionResource)
(resource.Schema, bool) {
+ for _, rs := range s.byAddOrder {
+ if rs.GroupVersionResource() == gvr {
+ return rs, true
+ }
+ }
+
+ return nil, false
+}
+
+// All returns all known Schemas
+func (s Schemas) All() []resource.Schema {
+ return slices.Clone(s.byAddOrder)
+}
+
+// GroupVersionKinds returns all known GroupVersionKinds
+func (s Schemas) GroupVersionKinds() []config.GroupVersionKind {
+ res := []config.GroupVersionKind{}
+ for _, r := range s.All() {
+ res = append(res, r.GroupVersionKind())
+ }
+ return res
+}
+
+// Add creates a copy of this Schemas with the given schemas added.
+func (s Schemas) Add(toAdd ...resource.Schema) Schemas {
+ b := NewSchemasBuilder()
+
+ for _, s := range s.byAddOrder {
+ b.MustAdd(s)
+ }
+
+ for _, s := range toAdd {
+ b.MustAdd(s)
+ }
+
+ return b.Build()
+}
+
+// Remove creates a copy of this Schemas with the given schemas removed.
+func (s Schemas) Remove(toRemove ...resource.Schema) Schemas {
+ b := NewSchemasBuilder()
+
+ for _, s := range s.byAddOrder {
+ shouldAdd := true
+ for _, r := range toRemove {
+ if r.Equal(s) {
+ shouldAdd = false
+ break
+ }
+ }
+ if shouldAdd {
+ b.MustAdd(s)
+ }
+ }
+
+ return b.Build()
+}
+
+// Kinds returns all known resource kinds.
+func (s Schemas) Kinds() []string {
+ kinds := sets.NewWithLength[string](len(s.byAddOrder))
+ for _, s := range s.byAddOrder {
+ kinds.Insert(s.Kind())
+ }
+
+ out := kinds.UnsortedList()
+ return slices.Sort(out)
+}
+
+// Validate the schemas. Returns error if there is a problem.
+func (s Schemas) Validate() (err error) {
+ for _, c := range s.byAddOrder {
+ err = multierror.Append(err, c.Validate()).ErrorOrNil()
+ }
+ return
+}
+
+func (s Schemas) Equal(o Schemas) bool {
+ return cmp.Equal(s.byAddOrder, o.byAddOrder)
+}
diff --git a/pkg/config/schema/collections/collections.go
b/pkg/config/schema/collections/collections.go
new file mode 100644
index 00000000..d38deddd
--- /dev/null
+++ b/pkg/config/schema/collections/collections.go
@@ -0,0 +1,14 @@
+//go:build !agent
+// +build !agent
+
+package collections
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+)
+
+var (
+ Sail = collection.NewSchemasBuilder().
+ // TODO MustAdd
+ Build()
+)
diff --git a/pkg/config/schema/resource/schema.go
b/pkg/config/schema/resource/schema.go
new file mode 100644
index 00000000..5012b662
--- /dev/null
+++ b/pkg/config/schema/resource/schema.go
@@ -0,0 +1,369 @@
+package resource
+
+import (
+ "errors"
+ "fmt"
+ "reflect"
+
+ "github.com/hashicorp/go-multierror"
+ "google.golang.org/protobuf/reflect/protoreflect"
+ "google.golang.org/protobuf/reflect/protoregistry"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ "github.com/apache/dubbo-kubernetes/pkg/config/validation"
+)
+
+// Schema for a resource.
+type Schema interface {
+ fmt.Stringer
+
+ // GroupVersionKind of the resource. This is the only way to uniquely
identify a resource.
+ GroupVersionKind() config.GroupVersionKind
+
+ // GroupVersionResource of the resource.
+ GroupVersionResource() schema.GroupVersionResource
+
+ // IsClusterScoped indicates that this resource is scoped to a
particular namespace within a cluster.
+ IsClusterScoped() bool
+
+ // IsBuiltin indicates that this resource is builtin (not a CRD)
+ IsBuiltin() bool
+
+ // Identifier returns a unique identifier for the resource
+ Identifier() string
+
+ // Kind for this resource.
+ Kind() string
+
+ // Plural returns the plural form of the Kind.
+ Plural() string
+
+ // Group for this resource.
+ Group() string
+
+ // Version of this resource.
+ Version() string
+
+ // GroupVersionAliasKinds is the GVK of this resource,
+ // but the version is from its version aliases to perform version
conversion.
+ GroupVersionAliasKinds() []config.GroupVersionKind
+
+ // APIVersion is a utility that returns a k8s API version string of the
form "Group/Version".
+ APIVersion() string
+
+ // Proto returns the protocol buffer type name for this resource.
+ Proto() string
+
+ // ProtoPackage returns the golang package for the protobuf resource.
+ ProtoPackage() string
+
+ // NewInstance returns a new instance of the protocol buffer message
for this resource.
+ NewInstance() (config.Spec, error)
+
+ // Status returns the associated status of the schema
+ Status() (config.Status, error)
+
+ // StatusKind returns the Kind of the status field. If unset, the field
does not support status.
+ StatusKind() string
+ StatusPackage() string
+
+ // MustNewInstance calls NewInstance and panics if an error occurs.
+ MustNewInstance() config.Spec
+
+ // Validate this schema.
+ Validate() error
+
+ // ValidateConfig validates that the given config message is of the
correct type for this schema
+ // and that the contents are valid.
+ ValidateConfig(cfg config.Config) (validation.Warning, error)
+
+ // Equal is a helper function for testing equality between Schema
instances. This supports comparison
+ // with the cmp library.
+ Equal(other Schema) bool
+}
+
+// Builder for a Schema.
+type Builder struct {
+ // ClusterScoped is true for resource in cluster-level.
+ ClusterScoped bool
+
+ // Synthetic is true for resource that do not actually exist in a
cluster
+ Synthetic bool
+
+ // Builtin is true for resources that are builtin (not CRD)
+ Builtin bool
+
+ // Identifier is the unique identifier for the resource
+ Identifier string
+
+ // Kind is the config proto type.
+ Kind string
+
+ // Plural is the type in plural.
+ Plural string
+
+ // Group is the config proto group.
+ Group string
+
+ // Version is the config proto version.
+ Version string
+
+ // VersionAliases is the config proto version aliases.
+ VersionAliases []string
+
+ // Proto refers to the protobuf message type name corresponding to the
type
+ Proto string
+
+ StatusProto string
+
+ // ReflectType is the type of the go struct
+ ReflectType reflect.Type
+
+ // StatusType is the type of the associated status.
+ StatusType reflect.Type
+
+ // ProtoPackage refers to the name of golang package for the protobuf
message.
+ ProtoPackage string
+
+ // StatusPackage refers to the name of the golang status package.
+ StatusPackage string
+
+ // ValidateProto performs validation on protobuf messages based on this
schema.
+ ValidateProto validation.ValidateFunc
+}
+
+// Build a Schema instance.
+func (b Builder) Build() (Schema, error) {
+ s := b.BuildNoValidate()
+
+ // Validate the schema.
+ if err := s.Validate(); err != nil {
+ return nil, err
+ }
+
+ return s, nil
+}
+
+// MustBuild calls Build and panics if it fails.
+func (b Builder) MustBuild() Schema {
+ s, err := b.Build()
+ if err != nil {
+ panic(fmt.Sprintf("MustBuild: %v", err))
+ }
+ return s
+}
+
+// BuildNoValidate builds the Schema without checking the fields.
+func (b Builder) BuildNoValidate() Schema {
+ if b.ValidateProto == nil {
+ b.ValidateProto = validation.EmptyValidate
+ }
+
+ return &schemaImpl{
+ clusterScoped: b.ClusterScoped,
+ synthetic: b.Synthetic,
+ builtin: b.Builtin,
+ gvk: config.GroupVersionKind{
+ Group: b.Group,
+ Version: b.Version,
+ Kind: b.Kind,
+ },
+ plural: b.Plural,
+ apiVersion: b.Group + "/" + b.Version,
+ versionAliases: b.VersionAliases,
+ proto: b.Proto,
+ goPackage: b.ProtoPackage,
+ identifier: b.Identifier,
+ reflectType: b.ReflectType,
+ validateConfig: b.ValidateProto,
+ statusType: b.StatusType,
+ statusPackage: b.StatusPackage,
+ }
+}
+
+type schemaImpl struct {
+ clusterScoped bool
+ builtin bool
+ gvk config.GroupVersionKind
+ versionAliases []string
+ plural string
+ apiVersion string
+ proto string
+ goPackage string
+ validateConfig validation.ValidateFunc
+ reflectType reflect.Type
+ statusType reflect.Type
+ statusPackage string
+ identifier string
+ synthetic bool
+}
+
+func (s *schemaImpl) GroupVersionKind() config.GroupVersionKind {
+ return s.gvk
+}
+
+func (s *schemaImpl) GroupVersionResource() schema.GroupVersionResource {
+ return schema.GroupVersionResource{
+ Group: s.Group(),
+ Version: s.Version(),
+ Resource: s.Plural(),
+ }
+}
+
+func (s *schemaImpl) IsClusterScoped() bool {
+ return s.clusterScoped
+}
+
+func (s *schemaImpl) IsBuiltin() bool {
+ return s.builtin
+}
+
+func (s *schemaImpl) Identifier() string {
+ return s.identifier
+}
+
+func (s *schemaImpl) Kind() string {
+ return s.gvk.Kind
+}
+
+func (s *schemaImpl) Plural() string {
+ return s.plural
+}
+
+func (s *schemaImpl) Group() string {
+ return s.gvk.Group
+}
+
+func (s *schemaImpl) Version() string {
+ return s.gvk.Version
+}
+
+func (s *schemaImpl) GroupVersionAliasKinds() []config.GroupVersionKind {
+ gvks := make([]config.GroupVersionKind, len(s.versionAliases))
+ for i, va := range s.versionAliases {
+ gvks[i] = s.gvk
+ gvks[i].Version = va
+ }
+ gvks = append(gvks, s.GroupVersionKind())
+ return gvks
+}
+
+func (s *schemaImpl) APIVersion() string {
+ return s.apiVersion
+}
+
+func (s *schemaImpl) Proto() string {
+ return s.proto
+}
+
+func (s *schemaImpl) ProtoPackage() string {
+ return s.goPackage
+}
+
+func (s *schemaImpl) StatusPackage() string {
+ return s.statusPackage
+}
+
+func (s *schemaImpl) Validate() (err error) {
+ if !labels.IsDNS1123Label(s.Kind()) {
+ err = multierror.Append(err, fmt.Errorf("invalid kind: %s",
s.Kind()))
+ }
+ if !labels.IsDNS1123Label(s.plural) {
+ err = multierror.Append(err, fmt.Errorf("invalid plural for
kind %s: %s", s.Kind(), s.plural))
+ }
+ if s.reflectType == nil && getProtoMessageType(s.proto) == nil {
+ err = multierror.Append(err, fmt.Errorf("proto message or
reflect type not found: %v", s.proto))
+ }
+ return
+}
+
+func (s *schemaImpl) String() string {
+ return fmt.Sprintf("[Schema](%s, %q, %s)", s.Kind(), s.goPackage,
s.proto)
+}
+
+func (s *schemaImpl) NewInstance() (config.Spec, error) {
+ rt := s.reflectType
+ var instance any
+ if rt == nil {
+ // Use proto
+ t, err := protoMessageType(protoreflect.FullName(s.proto))
+ if err != nil || t == nil {
+ return nil, errors.New("failed to find reflect type")
+ }
+ instance = t.New().Interface()
+ } else {
+ instance = reflect.New(rt).Interface()
+ }
+
+ p, ok := instance.(config.Spec)
+ if !ok {
+ return nil, fmt.Errorf(
+ "newInstance: message is not an instance of
config.Spec. kind:%s, type:%v, value:%v",
+ s.Kind(), rt, instance)
+ }
+ return p, nil
+}
+
+func (s *schemaImpl) Status() (config.Status, error) {
+ statTyp := s.statusType
+ if statTyp == nil {
+ return nil, errors.New("unknown status type")
+ }
+ instance := reflect.New(statTyp).Interface()
+ p, ok := instance.(config.Status)
+ if !ok {
+ return nil, fmt.Errorf("status: statusType not an instance of
config.Status. type: %v, value: %v", statTyp, instance)
+ }
+ return p, nil
+}
+
+func (s *schemaImpl) StatusKind() string {
+ if s.statusType == nil {
+ return ""
+ }
+ return s.statusType.Name()
+}
+
+func (s *schemaImpl) MustNewInstance() config.Spec {
+ p, err := s.NewInstance()
+ if err != nil {
+ panic(err)
+ }
+ return p
+}
+
+func (s *schemaImpl) ValidateConfig(cfg config.Config) (validation.Warning,
error) {
+ return s.validateConfig(cfg)
+}
+
+func (s *schemaImpl) Equal(o Schema) bool {
+ return s.IsClusterScoped() == o.IsClusterScoped() &&
+ s.Kind() == o.Kind() &&
+ s.Plural() == o.Plural() &&
+ s.Group() == o.Group() &&
+ s.Version() == o.Version() &&
+ s.Proto() == o.Proto() &&
+ s.ProtoPackage() == o.ProtoPackage()
+}
+
+// FromKubernetesGVK converts a Kubernetes GVK to an Istio GVK
+func FromKubernetesGVK(in *schema.GroupVersionKind) config.GroupVersionKind {
+ return config.GroupVersionKind{
+ Group: in.Group,
+ Version: in.Version,
+ Kind: in.Kind,
+ }
+}
+
+// getProtoMessageType returns the Go lang type of the proto with the
specified name.
+func getProtoMessageType(protoMessageName string) reflect.Type {
+ t, err := protoMessageType(protoreflect.FullName(protoMessageName))
+ if err != nil || t == nil {
+ return nil
+ }
+ return reflect.TypeOf(t.Zero().Interface())
+}
+
+var protoMessageType = protoregistry.GlobalTypes.FindMessageByName
diff --git a/pkg/config/validation/validation.go
b/pkg/config/validation/validation.go
index 2aba93ca..8fb7ea7c 100644
--- a/pkg/config/validation/validation.go
+++ b/pkg/config/validation/validation.go
@@ -17,8 +17,21 @@
package validation
-import "github.com/apache/dubbo-kubernetes/pkg/config/validation/agent"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/validation/agent"
+)
+
+var (
+ // EmptyValidate is a Validate that does nothing and returns no error.
+ EmptyValidate = func(config.Config) (Warning, error) {
+ return nil, nil
+ }
+)
type (
Warning = agent.Warning
)
+
+// ValidateFunc defines a validation func for an API proto.
+type ValidateFunc func(config config.Config) (Warning, error)
diff --git a/pkg/kube/collection/collection.go b/pkg/kube/collection/schemas.go
similarity index 100%
rename from pkg/kube/collection/collection.go
rename to pkg/kube/collection/schemas.go
diff --git a/pkg/kube/krt/core.go b/pkg/kube/krt/core.go
index 4dfa676f..37779049 100644
--- a/pkg/kube/krt/core.go
+++ b/pkg/kube/krt/core.go
@@ -128,3 +128,9 @@ type internalCollection[T any] interface {
// Create a new index into the collection
index(name string, extract func(o T) []string) indexer[T]
}
+
+// Namespacer is an optional interface that can be implemented by collection
types.
+// If implemented, this will be used to determine an objects' Namespace.
+type Namespacer interface {
+ GetNamespace() string
+}
diff --git a/pkg/kube/krt/debug.go b/pkg/kube/krt/debug.go
index e8a62eeb..3ba58688 100644
--- a/pkg/kube/krt/debug.go
+++ b/pkg/kube/krt/debug.go
@@ -72,3 +72,18 @@ func eraseMap[T any](l map[Key[T]]T) map[string]any {
}
return nm
}
+
+// maybeRegisterCollectionForDebugging registers the collection in the
debugger, if one is enabled
+func maybeRegisterCollectionForDebugging[T any](c Collection[T], handler
*DebugHandler) {
+ if handler == nil {
+ return
+ }
+ cc := c.(internalCollection[T])
+ handler.mu.Lock()
+ defer handler.mu.Unlock()
+ handler.debugCollections = append(handler.debugCollections,
DebugCollection{
+ name: cc.name(),
+ dump: cc.dump,
+ uid: cc.uid(),
+ })
+}
diff --git a/pkg/kube/krt/files/files.go b/pkg/kube/krt/files/files.go
index 9d222dcd..9f4e6d3e 100644
--- a/pkg/kube/krt/files/files.go
+++ b/pkg/kube/krt/files/files.go
@@ -21,7 +21,14 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/fsnotify/fsnotify"
"go.uber.org/atomic"
+ "k8s.io/klog/v2"
+ "os"
+ "path/filepath"
+ "sync"
"time"
)
@@ -29,12 +36,7 @@ type FileSingleton[T any] struct {
krt.Singleton[T]
}
-func NewFileSingleton[T any](
- fileWatcher filewatcher.FileWatcher,
- filename string,
- readFile func(filename string) (T, error),
- opts ...krt.CollectionOption,
-) (FileSingleton[T], error) {
+func NewFileSingleton[T any](fileWatcher filewatcher.FileWatcher, filename
string, readFile func(filename string) (T, error), opts
...krt.CollectionOption) (FileSingleton[T], error) {
cfg, err := readFile(filename)
if err != nil {
return FileSingleton[T]{}, err
@@ -61,6 +63,26 @@ func NewFileSingleton[T any](
return FileSingleton[T]{sc}, nil
}
+type FileCollection[T any] struct {
+ krt.StaticCollection[T]
+ read func() []T
+}
+
+func NewFileCollection[F any, O any](w *FolderWatch[F], transform func(F) *O,
opts ...krt.CollectionOption) FileCollection[O] {
+ res := FileCollection[O]{
+ read: func() []O {
+ return readSnapshot[F, O](w, transform)
+ },
+ }
+ sc := krt.NewStaticCollection[O](nil, res.read(), opts...)
+ w.subscribe(func() {
+ now := res.read()
+ sc.Reset(now)
+ })
+ res.StaticCollection = sc
+ return res
+}
+
func watchFile(fileWatcher filewatcher.FileWatcher, file string, stop <-chan
struct{}, callback func()) {
_ = fileWatcher.Add(file)
go func() {
@@ -80,3 +102,168 @@ func watchFile(fileWatcher filewatcher.FileWatcher, file
string, stop <-chan str
}
}()
}
+
+var supportedExtensions = sets.New(".yaml", ".yml")
+
+const watchDebounceDelay = 50 * time.Millisecond
+
+type FolderWatch[T any] struct {
+ root string
+ parse func([]byte) ([]T, error)
+
+ mu sync.RWMutex
+ state []T
+ callbacks []func()
+}
+
+func NewFolderWatch[T any](fileDir string, parse func([]byte) ([]T, error),
stop <-chan struct{}) (*FolderWatch[T], error) {
+ fw := &FolderWatch[T]{root: fileDir, parse: parse}
+ // Read initial state
+ if err := fw.readOnce(); err != nil {
+ return nil, err
+ }
+ fw.watch(stop)
+ return fw, nil
+}
+
+func (f *FolderWatch[T]) readOnce() error {
+ var result []T
+
+ err := filepath.Walk(f.root, func(path string, info os.FileInfo, err
error) error {
+ if err != nil {
+ return err
+ } else if !supportedExtensions.Contains(filepath.Ext(path)) ||
(info.Mode()&os.ModeType) != 0 {
+ return nil
+ }
+ data, err := os.ReadFile(path)
+ if err != nil {
+ klog.Errorf("Failed to readOnce %s: %v", path, err)
+ return err
+ }
+ parsed, err := f.parse(data)
+ if err != nil {
+ klog.Errorf("Failed to parse %s: %v", path, err)
+ return err
+ }
+ result = append(result, parsed...)
+ return nil
+ })
+ if err != nil {
+ klog.Errorf("failure during filepath.Walk: %v", err)
+ }
+
+ if err != nil {
+ return err
+ }
+
+ f.mu.Lock()
+ f.state = result
+ cb := slices.Clone(f.callbacks)
+ f.mu.Unlock()
+ for _, c := range cb {
+ c()
+ }
+ return nil
+}
+
+func (f *FolderWatch[T]) watch(stop <-chan struct{}) {
+ c := make(chan struct{}, 1)
+ if err := f.fileTrigger(c, stop); err != nil {
+ klog.Errorf("Unable to setup FileTrigger for %s: %v", f.root,
err)
+ return
+ }
+ // Run the close loop asynchronously.
+ go func() {
+ for {
+ select {
+ case <-c:
+ klog.Infof("Triggering reload of file
configuration")
+ if err := f.readOnce(); err != nil {
+ klog.Errorf("unable to reload file
configuration %v: %v", f.root, err)
+ }
+ case <-stop:
+ return
+ }
+ }
+ }()
+}
+
+func (f *FolderWatch[T]) get() []T {
+ f.mu.RLock()
+ defer f.mu.RUnlock()
+ return f.state
+}
+
+func (f *FolderWatch[T]) subscribe(fn func()) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.callbacks = append(f.callbacks, fn)
+}
+
+func (f *FolderWatch[T]) fileTrigger(events chan struct{}, stop <-chan
struct{}) error {
+ fs, err := fsnotify.NewWatcher()
+ if err != nil {
+ return err
+ }
+ watcher := recursiveWatcher{fs}
+ if err = watcher.watchRecursive(f.root); err != nil {
+ return err
+ }
+ go func() {
+ defer watcher.Close()
+ var debounceC <-chan time.Time
+ for {
+ select {
+ case <-debounceC:
+ debounceC = nil
+ events <- struct{}{}
+ case e := <-watcher.Events:
+ s, err := os.Stat(e.Name)
+ if err == nil && s != nil && s.IsDir() {
+ // If it's a directory, add a watch for
it so we see nested files.
+ if e.Op&fsnotify.Create != 0 {
+ klog.V(2).Infof("add watch for
%v: %v", s.Name(), watcher.watchRecursive(e.Name))
+ }
+ }
+ // Can't stat a deleted directory, so attempt
to remove it. If it fails it is not a problem
+ if e.Op&fsnotify.Remove != 0 {
+ _ = watcher.Remove(e.Name)
+ }
+ if debounceC == nil {
+ debounceC =
time.After(watchDebounceDelay)
+ }
+ case err := <-watcher.Errors:
+ klog.Errorf("Error watching file trigger: %v
%v", f.root, err)
+ return
+ case signal := <-stop:
+ klog.Infof("Shutting down file watcher: %v %v",
f.root, signal)
+ return
+ }
+ }
+ }()
+ return nil
+}
+
+func readSnapshot[F any, O any](w *FolderWatch[F], transform func(F) *O) []O {
+ res := w.get()
+ return slices.MapFilter(res, transform)
+}
+
+type recursiveWatcher struct {
+ *fsnotify.Watcher
+}
+
+func (m recursiveWatcher) watchRecursive(path string) error {
+ err := filepath.Walk(path, func(walkPath string, fi os.FileInfo, err
error) error {
+ if err != nil {
+ return err
+ }
+ if fi.IsDir() {
+ if err = m.Watcher.Add(walkPath); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ return err
+}
diff --git a/pkg/kube/krt/index.go b/pkg/kube/krt/index.go
index 5d873ea7..e94b355c 100644
--- a/pkg/kube/krt/index.go
+++ b/pkg/kube/krt/index.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "k8s.io/client-go/tools/cache"
)
type Index[K comparable, O any] interface {
@@ -42,6 +43,13 @@ func (i IndexObject[K, O]) ResourceName() string {
return toString(i.Key)
}
+// NewNamespaceIndex is a small helper to index a collection by namespace
+func NewNamespaceIndex[O Namespacer](c Collection[O]) Index[string, O] {
+ return NewIndex(c, cache.NamespaceIndex, func(o O) []string {
+ return []string{o.GetNamespace()}
+ })
+}
+
func NewIndex[K comparable, O any](
c Collection[O],
name string,
diff --git a/pkg/kube/krt/static.go b/pkg/kube/krt/static.go
new file mode 100644
index 00000000..a0b386be
--- /dev/null
+++ b/pkg/kube/krt/static.go
@@ -0,0 +1,251 @@
+package krt
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "sync"
+)
+
+type StaticCollection[T any] struct {
+ *staticList[T]
+}
+
+type staticList[T any] struct {
+ mu sync.RWMutex
+ vals map[string]T
+ eventHandlers *handlerSet[T]
+ id collectionUID
+ stop <-chan struct{}
+ collectionName string
+ syncer Syncer
+ metadata Metadata
+ indexes map[string]staticListIndex[T]
+}
+
+func (s *staticList[T]) Register(f func(o Event[T])) HandlerRegistration {
+ return registerHandlerAsBatched(s, f)
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) name() string {
+ return s.collectionName
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) uid() collectionUID {
+ return s.id
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) dump() CollectionDump {
+ return CollectionDump{
+ Outputs: eraseMap(slices.GroupUnique(s.List(), getTypedKey)),
+ Synced: s.HasSynced(),
+ }
+}
+
+func (s *staticList[T]) index(name string, extract func(o T) []string)
indexer[T] {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if idx, ok := s.indexes[name]; ok {
+ return idx
+ }
+
+ idx := staticListIndex[T]{
+ extract: extract,
+ index: make(map[string]sets.Set[string]),
+ parent: s,
+ }
+
+ for k, v := range s.vals {
+ idx.update(Event[T]{
+ Old: nil,
+ New: &v,
+ Event: controllers.EventAdd,
+ }, k)
+ }
+ s.indexes[name] = idx
+
+ return idx
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) augment(a any) any {
+ return a
+}
+
+func (s *staticList[T]) HasSynced() bool {
+ return s.syncer.HasSynced()
+}
+
+func (s *staticList[T]) Synced() Syncer {
+ // We are always synced in the static collection since the initial
state must be provided upfront
+ return alwaysSynced{}
+}
+
+func (s *staticList[T]) GetKey(k string) *T {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if o, f := s.vals[k]; f {
+ return &o
+ }
+ return nil
+}
+
+func (s *staticList[T]) Metadata() Metadata {
+ return s.metadata
+}
+
+func (s *staticList[T]) WaitUntilSynced(stop <-chan struct{}) bool {
+ return s.syncer.WaitUntilSynced(stop)
+}
+
+func (s *staticList[T]) RegisterBatch(f func(o []Event[T]), runExistingState
bool) HandlerRegistration {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ var objs []Event[T]
+ if runExistingState {
+ for _, v := range s.vals {
+ objs = append(objs, Event[T]{
+ New: &v,
+ Event: controllers.EventAdd,
+ })
+ }
+ }
+ return s.eventHandlers.Insert(f, s.Synced(), objs, s.stop)
+}
+
+func (s *staticList[T]) List() []T {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return maps.Values(s.vals)
+}
+
+func NewStaticCollection[T any](synced Syncer, vals []T, opts
...CollectionOption) StaticCollection[T] {
+ o := buildCollectionOptions(opts...)
+ if o.name == "" {
+ o.name = fmt.Sprintf("Static[%v]", ptr.TypeName[T]())
+ }
+
+ res := make(map[string]T, len(vals))
+ for _, v := range vals {
+ res[GetKey(v)] = v
+ }
+
+ if synced == nil {
+ synced = alwaysSynced{}
+ }
+
+ sl := &staticList[T]{
+ eventHandlers: newHandlerSet[T](),
+ vals: res,
+ id: nextUID(),
+ stop: o.stop,
+ collectionName: o.name,
+ syncer: synced,
+ indexes: make(map[string]staticListIndex[T]),
+ }
+
+ if o.metadata != nil {
+ sl.metadata = o.metadata
+ }
+
+ c := StaticCollection[T]{
+ staticList: sl,
+ }
+ maybeRegisterCollectionForDebugging[T](c, o.debugger)
+ return c
+}
+
+func (s StaticCollection[T]) Reset(newState []T) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ var updates []Event[T]
+ nv := map[string]T{}
+ for _, incoming := range newState {
+ k := GetKey(incoming)
+ nv[k] = incoming
+ if old, f := s.vals[k]; f {
+ if !Equal(old, incoming) {
+ ev := Event[T]{
+ Old: &old,
+ New: &incoming,
+ Event: controllers.EventUpdate,
+ }
+ for _, index := range s.indexes {
+ index.update(ev, k)
+ }
+ updates = append(updates, ev)
+ }
+ } else {
+ ev := Event[T]{
+ New: &incoming,
+ Event: controllers.EventAdd,
+ }
+ for _, index := range s.indexes {
+ index.update(ev, k)
+ }
+ updates = append(updates, ev)
+ }
+ delete(s.vals, k)
+ }
+ for k, remaining := range s.vals {
+ for _, index := range s.indexes {
+ index.delete(remaining, k)
+ }
+ updates = append(updates, Event[T]{
+ Old: &remaining,
+ Event: controllers.EventDelete,
+ })
+ }
+ s.vals = nv
+ if len(updates) > 0 {
+ s.eventHandlers.Distribute(updates, false)
+ }
+}
+
+// nolint: unused // (not true)
+type staticListIndex[T any] struct {
+ extract func(o T) []string
+ index map[string]sets.Set[string]
+ parent *staticList[T]
+}
+
+func (s staticListIndex[T]) update(ev Event[T], oKey string) {
+ if ev.Old != nil {
+ s.delete(*ev.Old, oKey)
+ }
+ if ev.New != nil {
+ newIndexKeys := s.extract(*ev.New)
+ for _, newIndexKey := range newIndexKeys {
+ sets.InsertOrNew(s.index, newIndexKey, oKey)
+ }
+ }
+}
+
+func (s staticListIndex[T]) delete(o T, oKey string) {
+ oldIndexKeys := s.extract(o)
+ for _, oldIndexKey := range oldIndexKeys {
+ sets.DeleteCleanupLast(s.index, oldIndexKey, oKey)
+ }
+}
+
+func (s staticListIndex[T]) Lookup(key string) []T {
+ s.parent.mu.RLock()
+ defer s.parent.mu.RUnlock()
+ keys := s.index[key]
+
+ res := make([]T, 0, len(keys))
+ for k := range keys {
+ v, f := s.parent.vals[k]
+ if !f {
+ continue
+ }
+ res = append(res, v)
+ }
+ return res
+}
diff --git a/pkg/kube/krt/sync.go b/pkg/kube/krt/sync.go
index acf75dba..f5d3b0f3 100644
--- a/pkg/kube/krt/sync.go
+++ b/pkg/kube/krt/sync.go
@@ -82,3 +82,13 @@ func (c multiSyncer) HasSynced() bool {
}
return true
}
+
+type alwaysSynced struct{}
+
+func (c alwaysSynced) WaitUntilSynced(stop <-chan struct{}) bool {
+ return true
+}
+
+func (c alwaysSynced) HasSynced() bool {
+ return true
+}
diff --git a/pkg/model/xds.go b/pkg/model/xds.go
new file mode 100644
index 00000000..c5eccbf1
--- /dev/null
+++ b/pkg/model/xds.go
@@ -0,0 +1,11 @@
+package model
+
+const (
+ APITypePrefix = "type.googleapis.com/"
+ ClusterType = APITypePrefix + "envoy.config.cluster.v3.Cluster"
+ ListenerType = APITypePrefix + "envoy.config.listener.v3.Listener"
+ EndpointType = APITypePrefix +
"envoy.config.endpoint.v3.ClusterLoadAssignment"
+ RouteType = APITypePrefix +
"envoy.config.route.v3.RouteConfiguration"
+
+ DebugType = "dubbo.io/debug"
+)
diff --git a/pkg/security/security.go b/pkg/security/security.go
index cbeafbd6..84ab7b62 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -20,6 +20,15 @@ package security
import (
"context"
"net/http"
+ "time"
+)
+
+const (
+ // RootCertReqResourceName is resource name of discovery request for
root certificate.
+ RootCertReqResourceName = "ROOTCA"
+ // WorkloadKeyCertResourceName is the resource name of the discovery
request for workload
+ // identity.
+ WorkloadKeyCertResourceName = "default"
)
type AuthContext struct {
@@ -34,6 +43,33 @@ type Authenticator interface {
AuthenticatorType() string
}
+// SecretItem is the cached item in in-memory secret store.
+type SecretItem struct {
+ CertificateChain []byte
+ PrivateKey []byte
+
+ RootCert []byte
+
+ // ResourceName passed from envoy SDS discovery request.
+ // "ROOTCA" for root cert request, "default" for key/cert request.
+ ResourceName string
+
+ CreatedTime time.Time
+
+ ExpireTime time.Time
+}
+
+// SecretManager defines secrets management interface which is used by SDS.
+type SecretManager interface {
+ // GenerateSecret generates new secret for the given resource.
+ //
+ // The current implementation also watched the generated secret and
trigger a callback when it is
+ // near expiry. It will constructs the SAN based on the token's 'sub'
claim, expected to be in
+ // the K8S format. No other JWTs are currently supported due to client
logic. If JWT is
+ // missing/invalid, the resourceName is used.
+ GenerateSecret(resourceName string) (*SecretItem, error)
+}
+
type AuthSource int
type KubernetesInfo struct {
diff --git a/pkg/wellknown/wellknown.go b/pkg/wellknown/wellknown.go
new file mode 100644
index 00000000..12d50e16
--- /dev/null
+++ b/pkg/wellknown/wellknown.go
@@ -0,0 +1,13 @@
+package wellknown
+
+// Package wellknown contains common names for filters, listeners, etc.
+// copied from envoyproxy/go-control-plane.
+// TODO: remove this package
+
+// Network filter names
+const (
+ // HTTPConnectionManager network filter
+ HTTPConnectionManager = "envoy.filters.network.http_connection_manager"
+ // TCPProxy network filter
+ TCPProxy = "envoy.filters.network.tcp_proxy"
+)
diff --git a/sail/cmd/sail-discovery/app/cmd.go
b/sail/cmd/sail-discovery/app/cmd.go
index e4ff4834..09e29571 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/ctrlz"
"github.com/apache/dubbo-kubernetes/sail/pkg/bootstrap"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
- "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/providers"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"github.com/spf13/cobra"
)
@@ -88,9 +88,9 @@ func addFlags(c *cobra.Command) {
p.CtrlZOptions = ctrlz.DefaultOptions()
})
c.PersistentFlags().StringSliceVar(&serverArgs.RegistryOptions.Registries,
"registries",
- []string{string(providers.Kubernetes)},
+ []string{string(provider.Kubernetes)},
fmt.Sprintf("Comma separated list of platform service
registries to read from (choose one or more from {%s})",
- providers.Kubernetes))
+ provider.Kubernetes))
c.PersistentFlags().StringVar(&serverArgs.RegistryOptions.ClusterRegistriesNamespace,
"clusterRegistriesNamespace",
serverArgs.RegistryOptions.ClusterRegistriesNamespace,
"Namespace for ConfigMap which stores clusters configs")
c.PersistentFlags().StringVar(&serverArgs.RegistryOptions.KubeConfig,
"kubeconfig", "",
diff --git a/sail/pkg/bootstrap/configcontroller.go
b/sail/pkg/bootstrap/configcontroller.go
index 0ab23e87..af68ff89 100644
--- a/sail/pkg/bootstrap/configcontroller.go
+++ b/sail/pkg/bootstrap/configcontroller.go
@@ -17,8 +17,265 @@
package bootstrap
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/pem"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/adsc"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ configaggregate
"github.com/apache/dubbo-kubernetes/sail/pkg/config/aggregate"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/file"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/config/memory"
+ dubboCredentials
"github.com/apache/dubbo-kubernetes/sail/pkg/credentials"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/credentials/kube"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
+ "istio.io/api/networking/v1alpha3"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "net/url"
+ "strings"
+)
+
type ConfigSourceAddressScheme string
const (
+ File ConfigSourceAddressScheme = "fs"
+ XDS ConfigSourceAddressScheme = "xds"
Kubernetes ConfigSourceAddressScheme = "k8s"
)
+
+func (s *Server) initConfigController(args *SailArgs) error {
+ meshConfig := s.environment.Mesh()
+ if len(meshConfig.ConfigSources) > 0 {
+ // Using MCP for config.
+ if err := s.initConfigSources(args); err != nil {
+ return err
+ }
+ } else if args.RegistryOptions.FileDir != "" {
+ // Local files - should be added even if other options are
specified
+ configController, err := file.NewController(
+ args.RegistryOptions.FileDir,
+ args.RegistryOptions.KubeOptions.DomainSuffix,
+ collections.Sail,
+ args.RegistryOptions.KubeOptions,
+ )
+ if err != nil {
+ return err
+ }
+ s.ConfigStores = append(s.ConfigStores, configController)
+ } else {
+ err := s.initK8SConfigStore(args)
+ if err != nil {
+ return err
+ }
+ }
+
+ // TODO ingress controller
+ // TODO addTerminatingStartFunc
+
+ // Wrap the config controller with a cache.
+ aggregateConfigController, err :=
configaggregate.MakeCache(s.ConfigStores)
+ if err != nil {
+ return err
+ }
+ s.configController = aggregateConfigController
+
+ // Create the config store.
+ s.environment.ConfigStore = aggregateConfigController
+
+ // Defer starting the controller until after the service is created.
+ s.addStartFunc("config controller", func(stop <-chan struct{}) error {
+ go s.configController.Run(stop)
+ return nil
+ })
+
+ return nil
+}
+
+// initConfigSources will process mesh config 'configSources' and initialize
+// associated configs.
+func (s *Server) initConfigSources(args *SailArgs) (err error) {
+ for _, configSource := range s.environment.Mesh().ConfigSources {
+ srcAddress, err := url.Parse(configSource.Address)
+ if err != nil {
+ return fmt.Errorf("invalid config URL %s %v",
configSource.Address, err)
+ }
+ scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
+ switch scheme {
+ case File:
+ if srcAddress.Path == "" {
+ return fmt.Errorf("invalid fs config URL %s,
contains no file path", configSource.Address)
+ }
+
+ configController, err := file.NewController(
+ srcAddress.Path,
+ args.RegistryOptions.KubeOptions.DomainSuffix,
+ collections.Sail,
+ args.RegistryOptions.KubeOptions,
+ )
+ if err != nil {
+ return err
+ }
+ s.ConfigStores = append(s.ConfigStores,
configController)
+ klog.Infof("Started File configSource %s",
configSource.Address)
+ case XDS:
+ transportCredentials, err :=
s.getTransportCredentials(args, configSource.TlsSettings)
+ if err != nil {
+ return fmt.Errorf("failed to read transport
credentials from config: %v", err)
+ }
+ xdsMCP, err := adsc.New(srcAddress.Host,
&adsc.ADSConfig{
+ InitialDiscoveryRequests:
adsc.ConfigInitialRequests(),
+ Config: adsc.Config{
+ Namespace: args.Namespace,
+ Workload: args.PodName,
+ Revision: "", // TODO
+ Meta: nil,
+ GrpcOpts: []grpc.DialOption{
+
args.KeepaliveOptions.ConvertToClientOption(),
+
grpc.WithTransportCredentials(transportCredentials),
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to dial XDS %s %v",
configSource.Address, err)
+ }
+ store := memory.Make(collections.Sail)
+ // TODO: enable namespace filter for memory controller
+ configController := memory.NewController(store)
+
configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
+ xdsMCP.Store = configController
+ err = xdsMCP.Run()
+ if err != nil {
+ return fmt.Errorf("MCP: failed running %v", err)
+ }
+ s.ConfigStores = append(s.ConfigStores,
configController)
+ klog.Infof("Started XDS configSource %s",
configSource.Address)
+ case Kubernetes:
+ if srcAddress.Path == "" || srcAddress.Path == "/" {
+ err2 := s.initK8SConfigStore(args)
+ if err2 != nil {
+ klog.Errorf("Error loading k8s: %v",
err2)
+ return err2
+ }
+ klog.Infof("Started Kubernetes configSource
%s", configSource.Address)
+ } else {
+ klog.Infof("Not implemented, ignore: %v",
configSource.Address)
+ // TODO: handle k8s:// scheme for remote
cluster. Use same mechanism as service registry,
+ // using the cluster name as key to match a
secret.
+ }
+ default:
+ klog.Infof("Ignoring unsupported config source: %v",
configSource.Address)
+ }
+ }
+ return nil
+}
+
+func (s *Server) initK8SConfigStore(args *SailArgs) error {
+ if s.kubeClient == nil {
+ return nil
+ }
+ // TODO
+ return nil
+}
+
+// getTransportCredentials attempts to create credentials.TransportCredentials
from ClientTLSSettings in mesh config
+// Implemented only for SIMPLE_TLS mode
+// TODO:
+//
+// Implement for MUTUAL_TLS/DUBBO_MUTUAL_TLS modes
+func (s *Server) getTransportCredentials(args *SailArgs, tlsSettings
*v1alpha3.ClientTLSSettings) (credentials.TransportCredentials, error) {
+ // TODO ValidateTLS
+
+ switch tlsSettings.GetMode() {
+ case v1alpha3.ClientTLSSettings_SIMPLE:
+ if len(tlsSettings.GetCredentialName()) > 0 {
+ rootCert, err :=
s.getRootCertFromSecret(tlsSettings.GetCredentialName(), args.Namespace)
+ if err != nil {
+ return nil, err
+ }
+ tlsSettings.CaCertificates = string(rootCert.Cert)
+ tlsSettings.CaCrl = string(rootCert.CRL)
+ }
+ if tlsSettings.GetInsecureSkipVerify().GetValue() ||
len(tlsSettings.GetCaCertificates()) == 0 {
+ return credentials.NewTLS(&tls.Config{
+ ServerName: tlsSettings.GetSni(),
+ InsecureSkipVerify:
tlsSettings.GetInsecureSkipVerify().GetValue(), // nolint
+ }), nil
+ }
+ certPool := x509.NewCertPool()
+ if
!certPool.AppendCertsFromPEM([]byte(tlsSettings.GetCaCertificates())) {
+ return nil, fmt.Errorf("failed to add ca certificate
from configSource.tlsSettings to pool")
+ }
+ return credentials.NewTLS(&tls.Config{
+ ServerName: tlsSettings.GetSni(),
+ InsecureSkipVerify:
tlsSettings.GetInsecureSkipVerify().GetValue(), // nolint
+ RootCAs: certPool,
+ VerifyPeerCertificate: func(rawCerts [][]byte,
verifiedChains [][]*x509.Certificate) error {
+ return s.verifyCert(rawCerts, tlsSettings)
+ },
+ }), nil
+ default:
+ return insecure.NewCredentials(), nil
+ }
+}
+
+// verifyCert verifies given cert against TLS settings like SANs and CRL.
+func (s *Server) verifyCert(certs [][]byte, tlsSettings
*v1alpha3.ClientTLSSettings) error {
+ if len(certs) == 0 {
+ return fmt.Errorf("no certificates provided")
+ }
+ cert, err := x509.ParseCertificate(certs[0])
+ if err != nil {
+ return fmt.Errorf("failed to parse certificate: %w", err)
+ }
+
+ if len(tlsSettings.SubjectAltNames) > 0 {
+ sanMatchFound := false
+ for _, san := range cert.DNSNames {
+ if sanMatchFound {
+ break
+ }
+ for _, name := range tlsSettings.SubjectAltNames {
+ if san == name {
+ sanMatchFound = true
+ break
+ }
+ }
+ }
+ if !sanMatchFound {
+ return fmt.Errorf("no matching SAN found")
+ }
+ }
+
+ if len(tlsSettings.CaCrl) > 0 {
+ crlData := []byte(strings.TrimSpace(tlsSettings.CaCrl))
+ block, _ := pem.Decode(crlData)
+ if block != nil {
+ crlData = block.Bytes
+ }
+ crl, err := x509.ParseRevocationList(crlData)
+ if err != nil {
+ return fmt.Errorf("failed to parse CRL: %w", err)
+ }
+ for _, revokedCert := range crl.RevokedCertificateEntries {
+ if cert.SerialNumber.Cmp(revokedCert.SerialNumber) == 0
{
+ return fmt.Errorf("certificate is revoked")
+ }
+ }
+ }
+
+ return nil
+}
+
+// getRootCertFromSecret fetches a map of keys and values from a secret with
name in namespace
+func (s *Server) getRootCertFromSecret(name, namespace string)
(*dubboCredentials.CertInfo, error) {
+ secret, err :=
s.kubeClient.Kube().CoreV1().Secrets(namespace).Get(context.Background(), name,
v1.GetOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get credential with name %v:
%v", name, err)
+ }
+ return kube.ExtractRoot(secret.Data)
+}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 00e4a551..f95cc434 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -34,7 +34,7 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/server"
- "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/providers"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
@@ -53,21 +53,28 @@ import (
)
type Server struct {
- XDSServer *xds.DiscoveryServer
- clusterID cluster.ID
- environment *model.Environment
- server server.Instance
- kubeClient kubelib.Client
- grpcServer *grpc.Server
- grpcAddress string
- secureGrpcServer *grpc.Server
- secureGrpcAddress string
- httpServer *http.Server // debug, monitoring and readiness
Server.
- httpAddr string
- httpsServer *http.Server // webhooks HTTPS Server.
- httpsAddr string
- httpMux *http.ServeMux
- httpsMux *http.ServeMux // webhooks
+ XDSServer *xds.DiscoveryServer
+ clusterID cluster.ID
+ environment *model.Environment
+ server server.Instance
+ kubeClient kubelib.Client
+
+ grpcServer *grpc.Server
+ grpcAddress string
+
+ secureGrpcServer *grpc.Server
+ secureGrpcAddress string
+
+ httpServer *http.Server // debug, monitoring and readiness Server.
+ httpAddr string
+ httpsServer *http.Server // webhooks HTTPS Server.
+ httpsAddr string
+ httpMux *http.ServeMux
+ httpsMux *http.ServeMux // webhooks
+
+ ConfigStores []model.ConfigStoreController
+ configController model.ConfigStoreController
+
fileWatcher filewatcher.FileWatcher
internalStop chan struct{}
shutdownDuration time.Duration
@@ -146,6 +153,10 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
return nil, err
}
+ if err := s.initControllers(args); err != nil {
+ return nil, err
+ }
+
return s, nil
}
@@ -289,6 +300,22 @@ func (s *Server) initServers(args *SailArgs) {
func (s *Server) initGrpcServer(options *dubbokeepalive.Options) {
}
+// initControllers initializes the controllers.
+func (s *Server) initControllers(args *SailArgs) error {
+ klog.Info("initializing controllers")
+ // TODO initMulticluster
+
+ // TODO initSDSServer
+
+ if err := s.initConfigController(args); err != nil {
+ return fmt.Errorf("error initializing config controller: %v",
err)
+ }
+ if err := s.initServiceControllers(args); err != nil {
+ return fmt.Errorf("error initializing service controllers: %v",
err)
+ }
+ return nil
+}
+
func (s *Server) serveHTTP() error {
// At this point we are ready - start Http Listener so that it can
respond to readiness events.
httpListener, err := net.Listen("tcp", s.httpServer.Addr)
@@ -333,11 +360,17 @@ func (s *Server) maybeCreateCA(caOpts *caOptions) error {
return nil
}
+// addStartFunc appends a function to be run. These are run synchronously in
order,
+// so the function should start a go routine if it needs to do anything
blocking
+func (s *Server) addStartFunc(name string, fn server.Component) {
+ s.server.RunComponent(name, fn)
+}
+
func getClusterID(args *SailArgs) cluster.ID {
clusterID := args.RegistryOptions.KubeOptions.ClusterID
if clusterID == "" {
if hasKubeRegistry(args.RegistryOptions.Registries) {
- clusterID = cluster.ID(providers.Kubernetes)
+ clusterID = cluster.ID(provider.Kubernetes)
}
}
return clusterID
diff --git a/sail/pkg/bootstrap/servicecontroller.go
b/sail/pkg/bootstrap/servicecontroller.go
new file mode 100644
index 00000000..44e493a2
--- /dev/null
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -0,0 +1,62 @@
+package bootstrap
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "k8s.io/klog/v2"
+)
+
+func (s *Server) ServiceController() *aggregate.Controller {
+ return s.environment.ServiceDiscovery.(*aggregate.Controller)
+}
+
+// initServiceControllers creates and initializes the service controllers
+func (s *Server) initServiceControllers(args *SailArgs) error {
+ serviceControllers := s.ServiceController()
+
+ // TODO service entry controller
+
+ registered := sets.New[provider.ID]()
+ for _, r := range args.RegistryOptions.Registries {
+ serviceRegistry := provider.ID(r)
+ if registered.Contains(serviceRegistry) {
+ klog.Infof("%s registry specified multiple times.", r)
+ continue
+ }
+ registered.Insert(serviceRegistry)
+ klog.Infof("Adding %s registry adapter", serviceRegistry)
+ switch serviceRegistry {
+ case provider.Kubernetes:
+ if err := s.initKubeRegistry(args); err != nil {
+ return err
+ }
+ default:
+ return fmt.Errorf("service registry %s is not
supported", r)
+ }
+ }
+
+ // Defer running of the service controllers.
+ s.addStartFunc("service controllers", func(stop <-chan struct{}) error {
+ go serviceControllers.Run(stop)
+ return nil
+ })
+
+ return nil
+}
+
+// initKubeRegistry creates all the k8s service controllers under this pilot
+func (s *Server) initKubeRegistry(args *SailArgs) (err error) {
+ args.RegistryOptions.KubeOptions.ClusterID = s.clusterID
+ // TODO reversion
+ args.RegistryOptions.KubeOptions.KrtDebugger = args.KrtDebugger
+ // TODO metrics
+ args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer
+ args.RegistryOptions.KubeOptions.MeshNetworksWatcher =
s.environment.NetworksWatcher
+ args.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
+ args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace
+ args.RegistryOptions.KubeOptions.MeshServiceController =
s.ServiceController()
+ // TODO NewMulticluster
+ return
+}
diff --git a/sail/pkg/bootstrap/util.go b/sail/pkg/bootstrap/util.go
index a4519a93..9d4a15e4 100644
--- a/sail/pkg/bootstrap/util.go
+++ b/sail/pkg/bootstrap/util.go
@@ -18,12 +18,12 @@
package bootstrap
import (
- "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/providers"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
)
func hasKubeRegistry(registries []string) bool {
for _, r := range registries {
- if providers.ID(r) == providers.Kubernetes {
+ if provider.ID(r) == provider.Kubernetes {
return true
}
}
diff --git a/sail/pkg/config/aggregate/config.go
b/sail/pkg/config/aggregate/config.go
new file mode 100644
index 00000000..16dfd442
--- /dev/null
+++ b/sail/pkg/config/aggregate/config.go
@@ -0,0 +1,189 @@
+// Package aggregate implements a read-only aggregator for config stores.
+package aggregate
+
+import (
+ "errors"
+
+ "k8s.io/apimachinery/pkg/types"
+
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+)
+
+var errorUnsupported = errors.New("unsupported operation: the config
aggregator is read-only")
+
+// makeStore creates an aggregate config store from several config stores and
+// unifies their descriptors
+func makeStore(stores []model.ConfigStore, writer model.ConfigStore)
(model.ConfigStore, error) {
+ union := collection.NewSchemasBuilder()
+ storeTypes := make(map[config.GroupVersionKind][]model.ConfigStore)
+ for _, store := range stores {
+ for _, s := range store.Schemas().All() {
+ if len(storeTypes[s.GroupVersionKind()]) == 0 {
+ if err := union.Add(s); err != nil {
+ return nil, err
+ }
+ }
+ storeTypes[s.GroupVersionKind()] =
append(storeTypes[s.GroupVersionKind()], store)
+ }
+ }
+
+ schemas := union.Build()
+ if err := schemas.Validate(); err != nil {
+ return nil, err
+ }
+ result := &store{
+ schemas: schemas,
+ stores: storeTypes,
+ writer: writer,
+ }
+
+ return result, nil
+}
+
+// MakeWriteableCache creates an aggregate config store cache from several
config store caches. An additional
+// `writer` config store is passed, which may or may not be part of `caches`.
+func MakeWriteableCache(caches []model.ConfigStoreController, writer
model.ConfigStore) (model.ConfigStoreController, error) {
+ stores := make([]model.ConfigStore, 0, len(caches))
+ for _, cache := range caches {
+ stores = append(stores, cache)
+ }
+ store, err := makeStore(stores, writer)
+ if err != nil {
+ return nil, err
+ }
+ return &storeCache{
+ ConfigStore: store,
+ caches: caches,
+ }, nil
+}
+
+// MakeCache creates an aggregate config store cache from several config store
+// caches.
+func MakeCache(caches []model.ConfigStoreController)
(model.ConfigStoreController, error) {
+ return MakeWriteableCache(caches, nil)
+}
+
+type store struct {
+ // schemas is the unified
+ schemas collection.Schemas
+
+ // stores is a mapping from config type to a store
+ stores map[config.GroupVersionKind][]model.ConfigStore
+
+ writer model.ConfigStore
+}
+
+func (cr *store) Schemas() collection.Schemas {
+ return cr.schemas
+}
+
+// Get the first config found in the stores.
+func (cr *store) Get(typ config.GroupVersionKind, name, namespace string)
*config.Config {
+ for _, store := range cr.stores[typ] {
+ config := store.Get(typ, name, namespace)
+ if config != nil {
+ return config
+ }
+ }
+ return nil
+}
+
+// List all configs in the stores.
+func (cr *store) List(typ config.GroupVersionKind, namespace string)
[]config.Config {
+ stores := cr.stores[typ]
+ if len(stores) == 0 {
+ return nil
+ }
+
+ var (
+ configs []config.Config
+ storeConfigs = make([][]config.Config, 0, len(stores))
+ configCnt int
+ )
+
+ for _, store := range stores {
+ curConfigs := store.List(typ, namespace)
+ storeConfigs = append(storeConfigs, curConfigs)
+ configCnt += len(curConfigs)
+ }
+
+ configs = make([]config.Config, 0, configCnt)
+ // Used to remove duplicated config
+ configMap := sets.NewWithLength[types.NamespacedName](configCnt)
+ for _, curConfigs := range storeConfigs {
+ configs = append(configs, curConfigs...)
+ }
+ configs = slices.FilterInPlace[config.Config](configs, func(cfg
config.Config) bool {
+ return !configMap.InsertContains(cfg.NamespacedName())
+ })
+
+ return configs
+}
+
+func (cr *store) Delete(typ config.GroupVersionKind, name, namespace string,
resourceVersion *string) error {
+ if cr.writer == nil {
+ return errorUnsupported
+ }
+ return cr.writer.Delete(typ, name, namespace, resourceVersion)
+}
+
+func (cr *store) Create(c config.Config) (string, error) {
+ if cr.writer == nil {
+ return "", errorUnsupported
+ }
+ return cr.writer.Create(c)
+}
+
+func (cr *store) Update(c config.Config) (string, error) {
+ if cr.writer == nil {
+ return "", errorUnsupported
+ }
+ return cr.writer.Update(c)
+}
+
+func (cr *store) UpdateStatus(c config.Config) (string, error) {
+ if cr.writer == nil {
+ return "", errorUnsupported
+ }
+ return cr.writer.UpdateStatus(c)
+}
+
+func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string,
error) {
+ if cr.writer == nil {
+ return "", errorUnsupported
+ }
+ return cr.writer.Patch(orig, patchFn)
+}
+
+type storeCache struct {
+ model.ConfigStore
+ caches []model.ConfigStoreController
+}
+
+func (cr *storeCache) HasSynced() bool {
+ for _, cache := range cr.caches {
+ if !cache.HasSynced() {
+ return false
+ }
+ }
+ return true
+}
+
+func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind,
handler model.EventHandler) {
+ for _, cache := range cr.caches {
+ if _, exists := cache.Schemas().FindByGroupVersionKind(kind);
exists {
+ cache.RegisterEventHandler(kind, handler)
+ }
+ }
+}
+
+func (cr *storeCache) Run(stop <-chan struct{}) {
+ for _, cache := range cr.caches {
+ go cache.Run(stop)
+ }
+ <-stop
+}
diff --git a/sail/pkg/config/kube/crd/config.go
b/sail/pkg/config/kube/crd/config.go
new file mode 100644
index 00000000..7b963013
--- /dev/null
+++ b/sail/pkg/config/kube/crd/config.go
@@ -0,0 +1,13 @@
+package crd
+
+import (
+ "encoding/json"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+type DubboKind struct {
+ metav1.TypeMeta
+ metav1.ObjectMeta `json:"metadata"`
+ Spec json.RawMessage `json:"spec"`
+ Status *json.RawMessage `json:"status,omitempty"`
+}
diff --git a/sail/pkg/config/kube/crd/conversion.go
b/sail/pkg/config/kube/crd/conversion.go
new file mode 100644
index 00000000..66b12cb7
--- /dev/null
+++ b/sail/pkg/config/kube/crd/conversion.go
@@ -0,0 +1,57 @@
+package crd
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+ "io"
+ kubeyaml "k8s.io/apimachinery/pkg/util/yaml"
+ "reflect"
+)
+
+type ConversionFunc = func(s resource.Schema, js string) (config.Spec, error)
+
+// TODO - add special cases for type-to-kind and kind-to-type
+// conversions with initial-isms. Consider adding additional type
+// information to the abstract model and/or elevating k8s
+// representation to first-class type to avoid extra conversions.
+
+func parseInputsImpl(inputs string, withValidate bool) ([]config.Config,
[]DubboKind, error) {
+ var varr []config.Config
+ var others []DubboKind
+ reader := bytes.NewReader([]byte(inputs))
+ empty := DubboKind{}
+
+ // We store configs as a YaML stream; there may be more than one
decoder.
+ yamlDecoder := kubeyaml.NewYAMLOrJSONDecoder(reader, 512*1024)
+ for {
+ obj := DubboKind{}
+ err := yamlDecoder.Decode(&obj)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot parse proto
message: %v", err)
+ }
+ if reflect.DeepEqual(obj, empty) {
+ continue
+ }
+
+ // TODO GatewayAPI
+ }
+
+ return varr, others, nil
+}
+
+// ParseInputs reads multiple documents from `kubectl` output and checks with
+// the schema. It also returns the list of unrecognized kinds as the second
+// response.
+//
+// NOTE: This function only decodes a subset of the complete k8s
+// ObjectMeta as identified by the fields in model.Meta. This
+// would typically only be a problem if a user dumps an configuration
+// object with kubectl and then re-ingests it.
+func ParseInputs(inputs string) ([]config.Config, []DubboKind, error) {
+ return parseInputsImpl(inputs, true)
+}
diff --git a/sail/pkg/config/kube/file/controller.go
b/sail/pkg/config/kube/file/controller.go
new file mode 100644
index 00000000..1a4ffa2a
--- /dev/null
+++ b/sail/pkg/config/kube/file/controller.go
@@ -0,0 +1,192 @@
+package file
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/crd"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ krtfiles "github.com/apache/dubbo-kubernetes/pkg/kube/krt/files"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ kubecontroller
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube/controller"
+)
+
+var errUnsupportedOp = fmt.Errorf("unsupported operation: the file config
store is a read-only view")
+
+type kindStore struct {
+ collection krt.Collection[config.Config]
+ index krt.Index[string, config.Config]
+ handlers []krt.HandlerRegistration
+}
+
+type Controller struct {
+ data map[config.GroupVersionKind]kindStore
+ schemas collection.Schemas
+ stop chan struct{}
+}
+
+type ConfigKind struct {
+ *config.Config
+}
+
+func (c ConfigKind) ResourceName() string {
+ if c.Namespace == "" {
+ return c.GroupVersionKind.String() + "/" + c.Name
+ }
+
+ return c.GroupVersionKind.String() + "/" + c.Namespace + c.Name
+}
+
+func (c ConfigKind) Equals(other ConfigKind) bool {
+ return c.Config.Equals(other.Config)
+}
+
+func NewController(
+ fileDir string,
+ domainSuffix string,
+ schemas collection.Schemas,
+ options kubecontroller.Options,
+) (*Controller, error) {
+ stop := make(chan struct{})
+ opts := krt.NewOptionsBuilder(stop, "file-monitor", options.KrtDebugger)
+ watch, err := krtfiles.NewFolderWatch(fileDir, func(b []byte)
([]*config.Config, error) {
+ return parseInputs(b, domainSuffix)
+ }, stop)
+ if err != nil {
+ return nil, err
+ }
+ mainCollection := krtfiles.NewFileCollection(watch, func(c
*config.Config) *ConfigKind {
+ return &ConfigKind{c}
+ }, opts.WithName("main")...)
+
+ data := make(map[config.GroupVersionKind]kindStore)
+ for _, s := range schemas.All() {
+ gvk := s.GroupVersionKind()
+ if _, ok := collections.Sail.FindByGroupVersionKind(gvk); ok {
+ collection := krt.NewCollection(mainCollection,
func(ctx krt.HandlerContext, c ConfigKind) *config.Config {
+ if c.GroupVersionKind == gvk {
+ return c.Config
+ }
+
+ return nil
+ }, opts.WithName(gvk.Kind)...)
+
+ data[gvk] = kindStore{
+ collection: collection,
+ index: krt.NewNamespaceIndex(collection),
+ }
+ }
+ }
+
+ return &Controller{
+ schemas: schemas,
+ stop: stop,
+ data: data,
+ }, nil
+}
+
+func (c *Controller) Schemas() collection.Schemas {
+ return c.schemas
+}
+
+func (c *Controller) Get(typ config.GroupVersionKind, name, namespace string)
*config.Config {
+ if data, ok := c.data[typ]; ok {
+ if namespace == "" {
+ return data.collection.GetKey(name)
+ }
+
+ return data.collection.GetKey(namespace + "/" + name)
+ }
+
+ return nil
+}
+
+func (c *Controller) List(typ config.GroupVersionKind, namespace string)
[]config.Config {
+ if data, ok := c.data[typ]; ok {
+ if namespace == metav1.NamespaceAll {
+ return data.collection.List()
+ }
+
+ return data.index.Lookup(namespace)
+ }
+
+ return nil
+}
+
+func (c *Controller) Create(config config.Config) (revision string, err error)
{
+ return "", errUnsupportedOp
+}
+
+func (c *Controller) Update(config config.Config) (newRevision string, err
error) {
+ return "", errUnsupportedOp
+}
+
+func (c *Controller) UpdateStatus(config config.Config) (newRevision string,
err error) {
+ return "", errUnsupportedOp
+}
+
+func (c *Controller) Patch(orig config.Config, patchFn config.PatchFunc)
(string, error) {
+ return "", errUnsupportedOp
+}
+
+func (c *Controller) Delete(typ config.GroupVersionKind, name, namespace
string, _ *string) error {
+ return errUnsupportedOp
+}
+
+func (c *Controller) RegisterEventHandler(typ config.GroupVersionKind, handler
model.EventHandler) {
+ if data, ok := c.data[typ]; ok {
+ data.handlers = append(
+ data.handlers,
+ data.collection.RegisterBatch(func(evs
[]krt.Event[config.Config]) {
+ for _, event := range evs {
+ switch event.Event {
+ case controllers.EventAdd:
+ handler(config.Config{},
*event.New, model.EventAdd)
+ case controllers.EventUpdate:
+ handler(*event.Old, *event.New,
model.EventUpdate)
+ case controllers.EventDelete:
+ handler(config.Config{},
*event.Old, model.EventDelete)
+ }
+ }
+ }, false),
+ )
+ }
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+ <-stop
+ close(c.stop)
+}
+
+func (c *Controller) HasSynced() bool {
+ for _, data := range c.data {
+ if !data.collection.HasSynced() {
+ return false
+ }
+
+ for _, handler := range data.handlers {
+ if !handler.HasSynced() {
+ return false
+ }
+ }
+ }
+
+ return true
+}
+
+// parseInputs is identical to crd.ParseInputs, except that it returns an
array of config pointers.
+func parseInputs(data []byte, domainSuffix string) ([]*config.Config, error) {
+ configs, _, err := crd.ParseInputs(string(data))
+
+ // Convert to an array of pointers.
+ refs := make([]*config.Config, len(configs))
+ for i := range configs {
+ refs[i] = &configs[i]
+ refs[i].Domain = domainSuffix
+ }
+ return refs, err
+}
diff --git a/sail/pkg/config/memory/controller.go
b/sail/pkg/config/memory/controller.go
new file mode 100644
index 00000000..06c121d8
--- /dev/null
+++ b/sail/pkg/config/memory/controller.go
@@ -0,0 +1,140 @@
+package memory
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "k8s.io/apimachinery/pkg/types"
+)
+
+// Controller is an implementation of ConfigStoreController.
+type Controller struct {
+ monitor Monitor
+ configStore model.ConfigStore
+ hasSynced func() bool
+
+ // If meshConfig.DiscoverySelectors are specified, the namespacesFilter
tracks the namespaces this controller watches.
+ namespacesFilter func(obj interface{}) bool
+}
+
+// NewController return an implementation of ConfigStoreController
+// This is a client-side monitor that dispatches events as the changes are
being
+// made on the client.
+func NewController(cs model.ConfigStore) *Controller {
+ out := &Controller{
+ configStore: cs,
+ // TODO monitor
+ }
+ return out
+}
+
+func (c *Controller) RegisterHasSyncedHandler(cb func() bool) {
+ c.hasSynced = cb
+}
+
+func (c *Controller) RegisterEventHandler(kind config.GroupVersionKind, f
model.EventHandler) {
+ c.monitor.AppendEventHandler(kind, f)
+}
+
+// HasSynced return whether store has synced
+// It can be controlled externally (such as by the data source),
+// otherwise it'll always consider synced.
+func (c *Controller) HasSynced() bool {
+ if c.hasSynced != nil {
+ return c.hasSynced()
+ }
+ return true
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+ c.monitor.Run(stop)
+}
+
+func (c *Controller) Schemas() collection.Schemas {
+ return c.configStore.Schemas()
+}
+
+func (c *Controller) Get(kind config.GroupVersionKind, key, namespace string)
*config.Config {
+ if c.namespacesFilter != nil && !c.namespacesFilter(namespace) {
+ return nil
+ }
+ return c.configStore.Get(kind, key, namespace)
+}
+
+func (c *Controller) Create(config config.Config) (revision string, err error)
{
+ if revision, err = c.configStore.Create(config); err == nil {
+ c.monitor.ScheduleProcessEvent(ConfigEvent{
+ config: config,
+ event: model.EventAdd,
+ })
+ }
+ return
+}
+
+func (c *Controller) Update(config config.Config) (newRevision string, err
error) {
+ oldconfig := c.configStore.Get(config.GroupVersionKind, config.Name,
config.Namespace)
+ if newRevision, err = c.configStore.Update(config); err == nil {
+ c.monitor.ScheduleProcessEvent(ConfigEvent{
+ old: *oldconfig,
+ config: config,
+ event: model.EventUpdate,
+ })
+ }
+ return
+}
+
+func (c *Controller) UpdateStatus(config config.Config) (newRevision string,
err error) {
+ oldconfig := c.configStore.Get(config.GroupVersionKind, config.Name,
config.Namespace)
+ if newRevision, err = c.configStore.UpdateStatus(config); err == nil {
+ c.monitor.ScheduleProcessEvent(ConfigEvent{
+ old: *oldconfig,
+ config: config,
+ event: model.EventUpdate,
+ })
+ }
+ return
+}
+
+func (c *Controller) Patch(orig config.Config, patchFn config.PatchFunc)
(newRevision string, err error) {
+ cfg, typ := patchFn(orig.DeepCopy())
+ switch typ {
+ case types.MergePatchType:
+ case types.JSONPatchType:
+ default:
+ return "", fmt.Errorf("unsupported merge type: %s", typ)
+ }
+ if newRevision, err = c.configStore.Patch(cfg, patchFn); err == nil {
+ c.monitor.ScheduleProcessEvent(ConfigEvent{
+ old: orig,
+ config: cfg,
+ event: model.EventUpdate,
+ })
+ }
+ return
+}
+
+func (c *Controller) Delete(kind config.GroupVersionKind, key, namespace
string, resourceVersion *string) error {
+ if config := c.Get(kind, key, namespace); config != nil {
+ if err := c.configStore.Delete(kind, key, namespace,
resourceVersion); err != nil {
+ return err
+ }
+ c.monitor.ScheduleProcessEvent(ConfigEvent{
+ config: *config,
+ event: model.EventDelete,
+ })
+ return nil
+ }
+ return fmt.Errorf("delete: config %v/%v/%v does not exist", kind,
namespace, key)
+}
+
+func (c *Controller) List(kind config.GroupVersionKind, namespace string)
[]config.Config {
+ configs := c.configStore.List(kind, namespace)
+ if c.namespacesFilter != nil {
+ return slices.Filter(configs, func(config config.Config) bool {
+ return c.namespacesFilter(config)
+ })
+ }
+ return configs
+}
diff --git a/sail/pkg/config/memory/monitor.go
b/sail/pkg/config/memory/monitor.go
new file mode 100644
index 00000000..49daad07
--- /dev/null
+++ b/sail/pkg/config/memory/monitor.go
@@ -0,0 +1,22 @@
+package memory
+
+import (
+ config2 "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+)
+
+type Handler func(config2.Config, config2.Config, model.Event)
+
+// Monitor provides methods of manipulating changes in the config store
+type Monitor interface {
+ Run(<-chan struct{})
+ AppendEventHandler(config2.GroupVersionKind, Handler)
+ ScheduleProcessEvent(ConfigEvent)
+}
+
+// ConfigEvent defines the event to be processed
+type ConfigEvent struct {
+ config config2.Config
+ old config2.Config
+ event model.Event
+}
diff --git a/sail/pkg/config/memory/store.go b/sail/pkg/config/memory/store.go
new file mode 100644
index 00000000..f36cde5e
--- /dev/null
+++ b/sail/pkg/config/memory/store.go
@@ -0,0 +1,250 @@
+package memory
+
+import (
+ "errors"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "sync"
+ "time"
+)
+
+var (
+ errNotFound = errors.New("item not found")
+ errAlreadyExists = errors.New("item already exists")
+ // TODO: can we make this compatible with kerror.IsConflict without
imports the library?
+ errConflict = errors.New("conflicting resource version, try again")
+)
+
+// Make creates an in-memory config store from a config schemas
+// It is with validation
+func Make(schemas collection.Schemas) model.ConfigStore {
+ return newStore(schemas, false)
+}
+
+type store struct {
+ schemas collection.Schemas
+ data map[config.GroupVersionKind]map[string]map[string]any
+ skipValidation bool
+ mutex sync.RWMutex
+}
+
+func newStore(schemas collection.Schemas, skipValidation bool)
model.ConfigStore {
+ out := store{
+ schemas: schemas,
+ data:
make(map[config.GroupVersionKind]map[string]map[string]any),
+ skipValidation: skipValidation,
+ }
+ for _, s := range schemas.All() {
+ out.data[s.GroupVersionKind()] = make(map[string]map[string]any)
+ }
+ return &out
+}
+
+func (cr *store) Schemas() collection.Schemas {
+ return cr.schemas
+}
+func (cr *store) Get(kind config.GroupVersionKind, name, namespace string)
*config.Config {
+ cr.mutex.RLock()
+ defer cr.mutex.RUnlock()
+ _, ok := cr.data[kind]
+ if !ok {
+ return nil
+ }
+
+ ns, exists := cr.data[kind][namespace]
+ if !exists {
+ return nil
+ }
+
+ out, exists := ns[name]
+ if !exists {
+ return nil
+ }
+ config := out.(config.Config)
+
+ return &config
+}
+
+func (cr *store) List(kind config.GroupVersionKind, namespace string)
[]config.Config {
+ cr.mutex.RLock()
+ defer cr.mutex.RUnlock()
+ data, exists := cr.data[kind]
+ if !exists {
+ return nil
+ }
+
+ var size int
+ if namespace == "" {
+ for _, ns := range data {
+ size += len(ns)
+ }
+ } else {
+ size = len(data[namespace])
+ }
+
+ out := make([]config.Config, 0, size)
+ if namespace == "" {
+ for _, ns := range data {
+ for _, value := range ns {
+ out = append(out, value.(config.Config))
+ }
+ }
+ } else {
+ ns, exists := data[namespace]
+ if !exists {
+ return nil
+ }
+ for _, value := range ns {
+ out = append(out, value.(config.Config))
+ }
+ }
+ return out
+}
+
+func (cr *store) Delete(kind config.GroupVersionKind, name, namespace string,
resourceVersion *string) error {
+ cr.mutex.Lock()
+ defer cr.mutex.Unlock()
+ data, ok := cr.data[kind]
+ if !ok {
+ return fmt.Errorf("unknown type %v", kind)
+ }
+ ns, exists := data[namespace]
+ if !exists {
+ return errNotFound
+ }
+
+ _, exists = ns[name]
+ if !exists {
+ return errNotFound
+ }
+
+ delete(ns, name)
+ return nil
+}
+
+func (cr *store) Create(cfg config.Config) (string, error) {
+ cr.mutex.Lock()
+ defer cr.mutex.Unlock()
+ kind := cfg.GroupVersionKind
+ s, ok := cr.schemas.FindByGroupVersionKind(kind)
+ if !ok {
+ return "", fmt.Errorf("unknown type %v", kind)
+ }
+ if !cr.skipValidation {
+ if _, err := s.ValidateConfig(cfg); err != nil {
+ return "", err
+ }
+ }
+ ns, exists := cr.data[kind][cfg.Namespace]
+ if !exists {
+ ns = map[string]any{}
+ cr.data[kind][cfg.Namespace] = ns
+ }
+
+ _, exists = ns[cfg.Name]
+
+ if !exists {
+ tnow := time.Now()
+ if cfg.ResourceVersion == "" {
+ cfg.ResourceVersion = tnow.String()
+ }
+ // Set the creation timestamp, if not provided.
+ if cfg.CreationTimestamp.IsZero() {
+ cfg.CreationTimestamp = tnow
+ }
+
+ ns[cfg.Name] = cfg
+ return cfg.ResourceVersion, nil
+ }
+ return "", errAlreadyExists
+}
+
+func (cr *store) Update(cfg config.Config) (string, error) {
+ cr.mutex.Lock()
+ defer cr.mutex.Unlock()
+ kind := cfg.GroupVersionKind
+ s, ok := cr.schemas.FindByGroupVersionKind(kind)
+ if !ok {
+ return "", fmt.Errorf("unknown type %v", kind)
+ }
+ if !cr.skipValidation {
+ if _, err := s.ValidateConfig(cfg); err != nil {
+ return "", err
+ }
+ }
+
+ ns, exists := cr.data[kind][cfg.Namespace]
+ if !exists {
+ return "", errNotFound
+ }
+
+ existing, exists := ns[cfg.Name]
+ if !exists {
+ return "", errNotFound
+ }
+ if hasConflict(existing.(config.Config), cfg) {
+ return "", errConflict
+ }
+ // TODO Annotations[ResourceVersion]
+ if cfg.Annotations != nil && cfg.Annotations[""] != "" {
+ cfg.ResourceVersion = cfg.Annotations[""]
+ delete(cfg.Annotations, "")
+ } else {
+ cfg.ResourceVersion = time.Now().String()
+ }
+
+ ns[cfg.Name] = cfg
+ return cfg.ResourceVersion, nil
+}
+
+func (cr *store) UpdateStatus(cfg config.Config) (string, error) {
+ return cr.Update(cfg)
+}
+
+func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string,
error) {
+ cr.mutex.Lock()
+ defer cr.mutex.Unlock()
+
+ gvk := orig.GroupVersionKind
+ s, ok := cr.schemas.FindByGroupVersionKind(gvk)
+ if !ok {
+ return "", fmt.Errorf("unknown type %v", gvk)
+ }
+
+ cfg, _ := patchFn(orig)
+ if !cr.skipValidation {
+ if _, err := s.ValidateConfig(cfg); err != nil {
+ return "", err
+ }
+ }
+
+ _, ok = cr.data[gvk]
+ if !ok {
+ return "", errNotFound
+ }
+ ns, exists := cr.data[gvk][orig.Namespace]
+ if !exists {
+ return "", errNotFound
+ }
+
+ rev := time.Now().String()
+ cfg.ResourceVersion = rev
+ ns[cfg.Name] = cfg
+
+ return rev, nil
+}
+
+// hasConflict checks if the two resources have a conflict, which will block
Update calls
+func hasConflict(existing, replacement config.Config) bool {
+ if replacement.ResourceVersion == "" {
+ // We don't care about resource version, so just always
overwrite
+ return false
+ }
+ // We set a resource version but its not matched, it is a conflict
+ if replacement.ResourceVersion != existing.ResourceVersion {
+ return true
+ }
+ return false
+}
diff --git a/sail/pkg/credentials/kube/secrets.go
b/sail/pkg/credentials/kube/secrets.go
new file mode 100644
index 00000000..191d2484
--- /dev/null
+++ b/sail/pkg/credentials/kube/secrets.go
@@ -0,0 +1,87 @@
+package kube
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/credentials"
+ "sort"
+ "strings"
+)
+
+const (
+ // The ID/name for the certificate chain in kubernetes generic secret.
+ GenericScrtCert = "cert"
+ // The ID/name for the private key in kubernetes generic secret.
+ GenericScrtKey = "key"
+ // The ID/name for the CA certificate in kubernetes generic secret.
+ GenericScrtCaCert = "cacert"
+ // The ID/name for the CRL in kubernetes generic secret.
+ GenericScrtCRL = "crl"
+
+ // The ID/name for the certificate chain in kubernetes tls secret.
+ TLSSecretCert = "tls.crt"
+ // The ID/name for the k8sKey in kubernetes tls secret.
+ TLSSecretKey = "tls.key"
+ // The ID/name for the certificate OCSP staple in kubernetes tls secret
+ TLSSecretOcspStaple = "tls.ocsp-staple"
+ // The ID/name for the CA certificate in kubernetes tls secret
+ TLSSecretCaCert = "ca.crt"
+ // The ID/name for the CRL in kubernetes tls secret.
+ TLSSecretCrl = "ca.crl"
+)
+
+func hasKeys(d map[string][]byte, keys ...string) bool {
+ for _, k := range keys {
+ _, f := d[k]
+ if !f {
+ return false
+ }
+ }
+ return true
+}
+
+func hasValue(d map[string][]byte, keys ...string) bool {
+ for _, k := range keys {
+ v := d[k]
+ if len(v) == 0 {
+ return false
+ }
+ }
+ return true
+}
+
+func truncatedKeysMessage(data map[string][]byte) string {
+ keys := []string{}
+ for k := range data {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ if len(keys) < 3 {
+ return strings.Join(keys, ", ")
+ }
+ return fmt.Sprintf("%s, and %d more...", strings.Join(keys[:3], ", "),
len(keys)-3)
+}
+
+// ExtractRoot extracts the root certificate
+func ExtractRoot(data map[string][]byte) (certInfo *credentials.CertInfo, err
error) {
+ ret := &credentials.CertInfo{}
+ if hasValue(data, GenericScrtCaCert) {
+ ret.Cert = data[GenericScrtCaCert]
+ ret.CRL = data[GenericScrtCRL]
+ return ret, nil
+ }
+ if hasValue(data, TLSSecretCaCert) {
+ ret.Cert = data[TLSSecretCaCert]
+ ret.CRL = data[TLSSecretCrl]
+ return ret, nil
+ }
+ // No cert found. Try to generate a helpful error message
+ if hasKeys(data, GenericScrtCaCert) {
+ return nil, fmt.Errorf("found key %q, but it was empty",
GenericScrtCaCert)
+ }
+ if hasKeys(data, TLSSecretCaCert) {
+ return nil, fmt.Errorf("found key %q, but it was empty",
TLSSecretCaCert)
+ }
+ found := truncatedKeysMessage(data)
+ return nil, fmt.Errorf("found secret, but didn't have expected keys %s
or %s; found: %s",
+ GenericScrtCaCert, TLSSecretCaCert, found)
+}
diff --git a/sail/pkg/credentials/model.go b/sail/pkg/credentials/model.go
new file mode 100644
index 00000000..f806593d
--- /dev/null
+++ b/sail/pkg/credentials/model.go
@@ -0,0 +1,13 @@
+package credentials
+
+// CertInfo wraps a certificate, key, and oscp staple information.
+type CertInfo struct {
+ // The certificate chain
+ Cert []byte
+ // The private key
+ Key []byte
+ // The oscp staple
+ Staple []byte
+ // Certificate Revocation List information
+ CRL []byte
+}
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
new file mode 100644
index 00000000..78e5b91e
--- /dev/null
+++ b/sail/pkg/model/config.go
@@ -0,0 +1,26 @@
+package model
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+)
+
+type ConfigStore interface {
+ Schemas() collection.Schemas
+ Get(typ config.GroupVersionKind, name, namespace string) *config.Config
+ List(typ config.GroupVersionKind, namespace string) []config.Config
+ Create(config config.Config) (revision string, err error)
+ Update(config config.Config) (newRevision string, err error)
+ UpdateStatus(config config.Config) (newRevision string, err error)
+ Patch(orig config.Config, patchFn config.PatchFunc) (string, error)
+ Delete(typ config.GroupVersionKind, name, namespace string,
resourceVersion *string) error
+}
+
+type EventHandler = func(config.Config, config.Config, Event)
+
+type ConfigStoreController interface {
+ ConfigStore
+ RegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)
+ Run(stop <-chan struct{})
+ HasSynced() bool
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 626c78d3..49705d61 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -32,7 +32,9 @@ import (
type Watcher = meshwatcher.WatcherCollection
type Environment struct {
+ ServiceDiscovery
Watcher
+ ConfigStore
mutex sync.RWMutex
pushContext *PushContext
Cache XdsCache
@@ -107,5 +109,4 @@ func (e *Environment) GetDiscoveryAddress() (host.Name,
string, error) {
return host.Name(hostname), port, nil
}
-type Proxy struct {
-}
+type Proxy struct{}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
new file mode 100644
index 00000000..a79c32f7
--- /dev/null
+++ b/sail/pkg/model/controller.go
@@ -0,0 +1,27 @@
+package model
+
+type Controller interface {
+ // Run until a signal is received
+ Run(stop <-chan struct{})
+}
+
+type Event int
+
+const (
+ EventAdd Event = iota
+ EventUpdate
+ EventDelete
+)
+
+func (event Event) String() string {
+ out := "unknown"
+ switch event {
+ case EventAdd:
+ out = "add"
+ case EventUpdate:
+ out = "update"
+ case EventDelete:
+ out = "delete"
+ }
+ return out
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index bf2e3b88..76870e34 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -46,3 +46,6 @@ func (pr *PushRequest) CopyMerge(other *PushRequest)
*PushRequest {
merged := &PushRequest{}
return merged
}
+
+type XDSUpdater interface {
+}
\ No newline at end of file
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
new file mode 100644
index 00000000..6573e2fb
--- /dev/null
+++ b/sail/pkg/model/service.go
@@ -0,0 +1,4 @@
+package model
+
+type ServiceDiscovery interface {
+}
diff --git a/sail/pkg/networking/util/util.go b/sail/pkg/networking/util/util.go
new file mode 100644
index 00000000..471493b0
--- /dev/null
+++ b/sail/pkg/networking/util/util.go
@@ -0,0 +1,6 @@
+package util
+
+const (
+ // PassthroughFilterChain to catch traffic that doesn't match other
filter chains.
+ PassthroughFilterChain = "PassthroughFilterChain"
+)
\ No newline at end of file
diff --git a/sail/pkg/server/instance.go b/sail/pkg/server/instance.go
index af7847ba..1ac26f66 100644
--- a/sail/pkg/server/instance.go
+++ b/sail/pkg/server/instance.go
@@ -18,6 +18,7 @@
package server
import (
+ "k8s.io/klog/v2"
"time"
)
@@ -30,6 +31,7 @@ type task struct {
type Instance interface {
Start(stop <-chan struct{}) error
+ RunComponent(name string, t Component)
}
type instance struct {
@@ -87,3 +89,12 @@ func (i *instance) Start(stop <-chan struct{}) error {
return nil
}
+
+func (i *instance) RunComponent(name string, t Component) {
+ select {
+ case <-i.done:
+ klog.Infof("attempting to run a new component %q after the
server was shutdown", name)
+ default:
+ i.components <- task{name, t}
+ }
+}
diff --git a/sail/pkg/serviceregistry/aggregate/controller.go
b/sail/pkg/serviceregistry/aggregate/controller.go
new file mode 100644
index 00000000..0b220755
--- /dev/null
+++ b/sail/pkg/serviceregistry/aggregate/controller.go
@@ -0,0 +1,55 @@
+package aggregate
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
+ "k8s.io/klog/v2"
+ "sync"
+)
+
+type Controller struct {
+ registries []*registryEntry
+ storeLock sync.RWMutex
+ running bool
+ meshHolder mesh.Holder
+ configClusterID cluster.ID
+}
+
+type registryEntry struct {
+ serviceregistry.Instance
+ // stop if not nil is the per-registry stop chan. If null, the server
stop chan should be used to Run the registry.
+ stop <-chan struct{}
+}
+
+type Options struct {
+ MeshHolder mesh.Holder
+ ConfigClusterID cluster.ID
+}
+
+func NewController(opt Options) *Controller {
+ return &Controller{
+ registries: make([]*registryEntry, 0),
+ configClusterID: opt.ConfigClusterID,
+ meshHolder: opt.MeshHolder,
+ running: false,
+ }
+}
+
+// Run starts all the controllers
+func (c *Controller) Run(stop <-chan struct{}) {
+ c.storeLock.Lock()
+ for _, r := range c.registries {
+ // prefer the per-registry stop channel
+ registryStop := stop
+ if s := r.stop; s != nil {
+ registryStop = s
+ }
+ go r.Run(registryStop)
+ }
+ c.running = true
+ c.storeLock.Unlock()
+
+ <-stop
+ klog.Info("Registry Aggregator terminated")
+}
diff --git a/sail/pkg/serviceregistry/instance.go
b/sail/pkg/serviceregistry/instance.go
new file mode 100644
index 00000000..f08a0bbb
--- /dev/null
+++ b/sail/pkg/serviceregistry/instance.go
@@ -0,0 +1,20 @@
+package serviceregistry
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+)
+
+// Instance of a service registry. A single service registry combines the
capabilities of service discovery
+// and the controller for managing asynchronous events.
+type Instance interface {
+ model.Controller
+ model.ServiceDiscovery
+
+ // Provider backing this service registry (i.e. Kubernetes etc.)
+ Provider() provider.ID
+
+ // Cluster for which the service registry applies. Only needed for
multicluster systems.
+ Cluster() cluster.ID
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go
b/sail/pkg/serviceregistry/kube/controller/controller.go
index 4d1a7582..dac2cf31 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -17,12 +17,28 @@
package controller
-import "github.com/apache/dubbo-kubernetes/pkg/cluster"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+)
type Options struct {
KubernetesAPIQPS float32
KubernetesAPIBurst int
DomainSuffix string
- ClusterID cluster.ID
- ClusterAliases map[string]string
+ // XDSUpdater will push changes to the xDS server.
+ XDSUpdater model.XDSUpdater
+ // MeshNetworksWatcher observes changes to the mesh networks config.
+ MeshNetworksWatcher mesh.NetworksWatcher
+ // MeshWatcher observes changes to the mesh config
+ MeshWatcher meshwatcher.WatcherCollection
+ ClusterID cluster.ID
+ ClusterAliases map[string]string
+ SystemNamespace string
+ MeshServiceController *aggregate.Controller
+ KrtDebugger *krt.DebugHandler
}
diff --git a/sail/pkg/serviceregistry/providers/providers.go
b/sail/pkg/serviceregistry/provider/provider.go
similarity index 98%
rename from sail/pkg/serviceregistry/providers/providers.go
rename to sail/pkg/serviceregistry/provider/provider.go
index f0532b1c..0d0f7b46 100644
--- a/sail/pkg/serviceregistry/providers/providers.go
+++ b/sail/pkg/serviceregistry/provider/provider.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package providers
+package provider
// ID defines underlying platform supporting service registry
type ID string
diff --git a/sail/pkg/xds/v3/model.go b/sail/pkg/xds/v3/model.go
new file mode 100644
index 00000000..790de20c
--- /dev/null
+++ b/sail/pkg/xds/v3/model.go
@@ -0,0 +1,11 @@
+package v3
+
+import "github.com/apache/dubbo-kubernetes/pkg/model"
+
+const (
+ ClusterType = model.ClusterType
+ ListenerType = model.ListenerType
+ EndpointType = model.EndpointType
+ RouteType = model.RouteType
+ DebugType = model.DebugType
+)
diff --git a/security/pkg/pki/ca/ca.go b/security/pkg/pki/ca/ca.go
index 48b85751..a9eb7a80 100644
--- a/security/pkg/pki/ca/ca.go
+++ b/security/pkg/pki/ca/ca.go
@@ -313,7 +313,7 @@ func loadSelfSignedCaSecret(client corev1.CoreV1Interface,
namespace string, caC
); err != nil {
return fmt.Errorf("failed to create CA KeyCertBundle
(%v)", err)
}
- klog.Infof("Using existing public key: %v", string(rootCerts))
+ klog.Infof("Using existing public key: \n%v", string(rootCerts))
}
return err
}