This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c0a5811 [sail] add init container (#790)
7c0a5811 is described below

commit 7c0a58111e750f692332f01fa4ef769889a8f966
Author: Jian Zhong <[email protected]>
AuthorDate: Sun Sep 21 15:41:39 2025 +0800

    [sail] add init container (#790)
---
 go.mod                                             |   3 +-
 go.sum                                             |   2 +
 pkg/adsc/adsc.go                                   | 830 +++++++++++++++++++++
 pkg/adsc/util.go                                   |  57 ++
 pkg/config/schema/collection/schemas.go            | 232 ++++++
 pkg/config/schema/collections/collections.go       |  14 +
 pkg/config/schema/resource/schema.go               | 369 +++++++++
 pkg/config/validation/validation.go                |  15 +-
 pkg/kube/collection/{collection.go => schemas.go}  |   0
 pkg/kube/krt/core.go                               |   6 +
 pkg/kube/krt/debug.go                              |  15 +
 pkg/kube/krt/files/files.go                        | 199 ++++-
 pkg/kube/krt/index.go                              |   8 +
 pkg/kube/krt/static.go                             | 251 +++++++
 pkg/kube/krt/sync.go                               |  10 +
 pkg/model/xds.go                                   |  11 +
 pkg/security/security.go                           |  36 +
 pkg/wellknown/wellknown.go                         |  13 +
 sail/cmd/sail-discovery/app/cmd.go                 |   6 +-
 sail/pkg/bootstrap/configcontroller.go             | 257 +++++++
 sail/pkg/bootstrap/server.go                       |  67 +-
 sail/pkg/bootstrap/servicecontroller.go            |  62 ++
 sail/pkg/bootstrap/util.go                         |   4 +-
 sail/pkg/config/aggregate/config.go                | 189 +++++
 sail/pkg/config/kube/crd/config.go                 |  13 +
 sail/pkg/config/kube/crd/conversion.go             |  57 ++
 sail/pkg/config/kube/file/controller.go            | 192 +++++
 sail/pkg/config/memory/controller.go               | 140 ++++
 sail/pkg/config/memory/monitor.go                  |  22 +
 sail/pkg/config/memory/store.go                    | 250 +++++++
 sail/pkg/credentials/kube/secrets.go               |  87 +++
 sail/pkg/credentials/model.go                      |  13 +
 sail/pkg/model/config.go                           |  26 +
 sail/pkg/model/context.go                          |   5 +-
 sail/pkg/model/controller.go                       |  27 +
 sail/pkg/model/push_context.go                     |   3 +
 sail/pkg/model/service.go                          |   4 +
 sail/pkg/networking/util/util.go                   |   6 +
 sail/pkg/server/instance.go                        |  11 +
 sail/pkg/serviceregistry/aggregate/controller.go   |  55 ++
 sail/pkg/serviceregistry/instance.go               |  20 +
 .../serviceregistry/kube/controller/controller.go  |  22 +-
 .../providers.go => provider/provider.go}          |   2 +-
 sail/pkg/xds/v3/model.go                           |  11 +
 security/pkg/pki/ca/ca.go                          |   2 +-
 45 files changed, 3587 insertions(+), 37 deletions(-)

diff --git a/go.mod b/go.mod
index d3038f32..5b397c0a 100644
--- a/go.mod
+++ b/go.mod
@@ -38,6 +38,7 @@ require (
        github.com/go-jose/go-jose/v4 v4.0.5
        github.com/gogo/protobuf v1.3.2
        github.com/golang/protobuf v1.5.4
+       github.com/google/go-cmp v0.7.0
        github.com/google/go-containerregistry v0.20.6
        github.com/hashicorp/go-multierror v1.1.1
        github.com/heroku/color v0.0.6
@@ -71,6 +72,7 @@ require (
 )
 
 require (
+       cel.dev/expr v0.24.0 // indirect
        dario.cat/mergo v1.0.2 // indirect
        github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
        github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // 
indirect
@@ -143,7 +145,6 @@ require (
        github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // 
indirect
        github.com/google/btree v1.1.3 // indirect
        github.com/google/gnostic-models v0.6.9 // indirect
-       github.com/google/go-cmp v0.7.0 // indirect
        github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a // indirect
        github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
        github.com/google/uuid v1.6.0 // indirect
diff --git a/go.sum b/go.sum
index 54d2b529..c0bd9a31 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,5 @@
+cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
+cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
 cloud.google.com/go v0.26.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.114.0 h1:OIPFAdfrFDFO2ve2U7r/H5SwSbBzEdrBdE7xkgwc+kY=
 cloud.google.com/go v0.114.0/go.mod 
h1:ZV9La5YYxctro1HTPug5lXH/GefROyW8PPD4T8n9J8E=
diff --git a/pkg/adsc/adsc.go b/pkg/adsc/adsc.go
new file mode 100644
index 00000000..16282d67
--- /dev/null
+++ b/pkg/adsc/adsc.go
@@ -0,0 +1,830 @@
+package adsc
+
+import (
+       "context"
+       "crypto/tls"
+       "crypto/x509"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/backoff"
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
+       "github.com/apache/dubbo-kubernetes/pkg/wellknown"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/networking/util"
+       v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+       cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+       core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+       endpoint 
"github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+       listener 
"github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+       route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+       discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/proto"
+       anypb "google.golang.org/protobuf/types/known/anypb"
+       pstruct "google.golang.org/protobuf/types/known/structpb"
+       mcp "istio.io/api/mcp/v1alpha1"
+       "istio.io/api/mesh/v1alpha1"
+       "k8s.io/klog/v2"
+       "math"
+       "net"
+       "os"
+       "strings"
+       "sync"
+       "time"
+)
+
+const (
+       defaultClientMaxReceiveMessageSize = math.MaxInt32
+       defaultInitialConnWindowSize       = 1024 * 1024 // default gRPC 
InitialWindowSize
+       defaultInitialWindowSize           = 1024 * 1024 // default gRPC 
ConnWindowSize
+)
+
+type Config struct {
+       // Address of the xDS server
+       Address string
+       // Namespace defaults to 'default'
+       Namespace string
+       // CertDir is the directory where mTLS certs are configured.
+       // If CertDir and Secret are empty, an insecure connection will be used.
+       // TODO: implement SecretManager for cert dir
+       CertDir string
+       // Secrets is the interface used for getting keys and rootCA.
+       SecretManager security.SecretManager
+       GrpcOpts      []grpc.DialOption
+       // XDSRootCAFile explicitly set the root CA to be used for the XDS 
connection.
+       // Mirrors Envoy file.
+       XDSRootCAFile string
+       // XDSSAN is the expected SAN of the XDS server. If not set, the 
ProxyConfig.DiscoveryAddress is used.
+       XDSSAN string
+       // InsecureSkipVerify skips client verification the server's 
certificate chain and host name.
+       InsecureSkipVerify bool
+       // Workload defaults to 'test'
+       Workload string
+       // Revision for this control plane instance. We will only read configs 
that match this revision.
+       Revision string
+       // Meta includes additional metadata for the node
+       Meta *pstruct.Struct
+       // IP is currently the primary key used to locate inbound configs. It 
is sent by client,
+       // must match a known endpoint IP. Tests can use a ServiceEntry to 
register fake IPs.
+       IP       string
+       Locality *core.Locality
+       // BackoffPolicy determines the reconnect policy. Based on MCP client.
+       BackoffPolicy backoff.BackOff
+}
+
+// ADSC implements a basic client for ADS, for use in stress tests and tools
+// or libraries that need to connect to Istio pilot or other ADS servers.
+type ADSC struct {
+       // Stream is the GRPC connection stream, allowing direct GRPC send 
operations.
+       // Set after Dial is called.
+       stream 
discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
+       // xds client used to create a stream
+       client discovery.AggregatedDiscoveryServiceClient
+       conn   *grpc.ClientConn
+       // Indicates if the ADSC client is closed
+       closed    bool
+       watchTime time.Time
+       // Updates includes the type of the last update received from the 
server.
+       Updates     chan string
+       errChan     chan error
+       XDSUpdates  chan *discovery.DiscoveryResponse
+       VersionInfo map[string]string
+
+       // Last received message, by type
+       Received map[string]*discovery.DiscoveryResponse
+
+       mutex sync.RWMutex
+
+       Mesh *v1alpha1.MeshConfig
+
+       // Retrieved configurations can be stored using the common istio model 
interface.
+       Store model.ConfigStore
+       cfg   *ADSConfig
+       // sendNodeMeta is set to true if the connection is new - and we need 
to send node meta.,
+       sendNodeMeta bool
+       // initialLoad tracks the time to receive the initial configuration.
+       initialLoad time.Duration
+       // indicates if the initial LDS request is sent
+       initialLds bool
+       // httpListeners contains received listeners with a 
http_connection_manager filter.
+       httpListeners map[string]*listener.Listener
+       // tcpListeners contains all listeners of type TCP (not-HTTP)
+       tcpListeners map[string]*listener.Listener
+       // All received clusters of type eds, keyed by name
+       edsClusters map[string]*cluster.Cluster
+       // All received clusters of no-eds type, keyed by name
+       clusters map[string]*cluster.Cluster
+       // All received routes, keyed by route name
+       routes map[string]*route.RouteConfiguration
+       // All received endpoints, keyed by cluster name
+       eds  map[string]*endpoint.ClusterLoadAssignment
+       sync map[string]time.Time
+}
+
+type ResponseHandler interface {
+       HandleResponse(con *ADSC, response *discovery.DiscoveryResponse)
+}
+
+// ADSConfig for the ADS connection.
+type ADSConfig struct {
+       Config
+
+       // InitialDiscoveryRequests is a list of resources to watch at first, 
represented as URLs (for new XDS resource naming)
+       // or type URLs.
+       InitialDiscoveryRequests []*discovery.DiscoveryRequest
+
+       // ResponseHandler will be called on each DiscoveryResponse.
+       // TODO: mirror Generator, allow adding handler per type
+       ResponseHandler ResponseHandler
+}
+
+// New creates a new ADSC, maintaining a connection to an XDS server.
+// Will:
+// - get certificate using the Secret provider, if CertRequired
+// - connect to the XDS server specified in ProxyConfig
+// - send initial request for watched resources
+// - wait for response from XDS server
+// - on success, start a background thread to maintain the connection, with 
exp. backoff.
+func New(discoveryAddr string, opts *ADSConfig) (*ADSC, error) {
+       if opts == nil {
+               opts = &ADSConfig{}
+       }
+       opts.Config = setDefaultConfig(&opts.Config)
+       opts.Address = discoveryAddr
+       adsc := &ADSC{
+               Updates:     make(chan string, 100),
+               XDSUpdates:  make(chan *discovery.DiscoveryResponse, 100),
+               VersionInfo: map[string]string{},
+               Received:    map[string]*discovery.DiscoveryResponse{},
+               cfg:         opts,
+               sync:        map[string]time.Time{},
+               errChan:     make(chan error, 10),
+       }
+       if err := adsc.Dial(); err != nil {
+               return nil, err
+       }
+
+       return adsc, nil
+}
+
+func defaultGrpcDialOptions() []grpc.DialOption {
+       return []grpc.DialOption{
+               // TODO(SpecialYang) maybe need to make it configurable.
+               grpc.WithInitialWindowSize(int32(defaultInitialWindowSize)),
+               
grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize)),
+               
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)),
+       }
+}
+
+func (a *ADSC) node() *core.Node {
+       return buildNode(&a.cfg.Config)
+}
+
+func buildNode(config *Config) *core.Node {
+       n := &core.Node{
+               // Id:       nodeID(config),
+               Locality: config.Locality,
+       }
+       if config.Meta == nil {
+               n.Metadata = &pstruct.Struct{
+                       Fields: map[string]*pstruct.Value{
+                               "ISTIO_VERSION": {Kind: 
&pstruct.Value_StringValue{StringValue: "65536.65536.65536"}},
+                       },
+               }
+       } else {
+               n.Metadata = config.Meta
+               if config.Meta.Fields["ISTIO_VERSION"] == nil {
+                       config.Meta.Fields["ISTIO_VERSION"] = 
&pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue: 
"65536.65536.65536"}}
+               }
+       }
+       return n
+}
+
+func (a *ADSC) handleRecv() {
+       // We connected, so reset the backoff
+       if a.cfg.BackoffPolicy != nil {
+               a.cfg.BackoffPolicy.Reset()
+       }
+       for {
+               var err error
+               msg, err := a.stream.Recv()
+               if err != nil {
+                       klog.Errorf("connection closed with err: %v", err)
+                       select {
+                       case a.errChan <- err:
+                       default:
+                       }
+                       // if 'reconnect' enabled - schedule a new Run
+                       if a.cfg.BackoffPolicy != nil {
+                               
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
+                       } else {
+                               a.Close()
+                               a.WaitClear()
+                               a.Updates <- ""
+                               a.XDSUpdates <- nil
+                               close(a.errChan)
+                       }
+                       return
+               }
+
+               // Group-value-kind - used for high level api generator.
+               resourceGvk, isMCP := convertTypeURLToMCPGVK(msg.TypeUrl)
+
+               // TODO WithLabels
+               if a.cfg.ResponseHandler != nil {
+                       a.cfg.ResponseHandler.HandleResponse(a, msg)
+               }
+
+               if msg.TypeUrl == gvk.MeshConfig.String() &&
+                       len(msg.Resources) > 0 {
+                       rsc := msg.Resources[0]
+                       m := &v1alpha1.MeshConfig{}
+                       err = proto.Unmarshal(rsc.Value, m)
+                       if err != nil {
+                               klog.Errorf("Failed to unmarshal mesh config: 
%v", err)
+                       }
+                       a.Mesh = m
+                       continue
+               }
+
+               // Process the resources.
+               a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
+               switch msg.TypeUrl {
+               case v3.ListenerType:
+                       listeners := make([]*listener.Listener, 0, 
len(msg.Resources))
+                       for _, rsc := range msg.Resources {
+                               valBytes := rsc.Value
+                               ll := &listener.Listener{}
+                               _ = proto.Unmarshal(valBytes, ll)
+                               listeners = append(listeners, ll)
+                       }
+                       a.handleLDS(listeners)
+               case v3.ClusterType:
+                       clusters := make([]*cluster.Cluster, 0, 
len(msg.Resources))
+                       for _, rsc := range msg.Resources {
+                               valBytes := rsc.Value
+                               cl := &cluster.Cluster{}
+                               _ = proto.Unmarshal(valBytes, cl)
+                               clusters = append(clusters, cl)
+                       }
+                       a.handleCDS(clusters)
+               case v3.EndpointType:
+                       eds := make([]*endpoint.ClusterLoadAssignment, 0, 
len(msg.Resources))
+                       for _, rsc := range msg.Resources {
+                               valBytes := rsc.Value
+                               el := &endpoint.ClusterLoadAssignment{}
+                               _ = proto.Unmarshal(valBytes, el)
+                               eds = append(eds, el)
+                       }
+                       a.handleEDS(eds)
+               case v3.RouteType:
+                       routes := make([]*route.RouteConfiguration, 0, 
len(msg.Resources))
+                       for _, rsc := range msg.Resources {
+                               valBytes := rsc.Value
+                               rl := &route.RouteConfiguration{}
+                               _ = proto.Unmarshal(valBytes, rl)
+                               routes = append(routes, rl)
+                       }
+                       a.handleRDS(routes)
+               default:
+                       if isMCP {
+                               a.handleMCP(resourceGvk, msg.Resources)
+                       }
+               }
+
+               // If we got no resource - still save to the store with empty 
name/namespace, to notify sync
+               // This scheme also allows us to chunk large responses !
+
+               // TODO: add hook to inject nacks
+
+               a.mutex.Lock()
+               if isMCP {
+                       if _, exist := a.sync[resourceGvk.String()]; !exist {
+                               a.sync[resourceGvk.String()] = time.Now()
+                       }
+               }
+               a.Received[msg.TypeUrl] = msg
+               a.ack(msg)
+               a.mutex.Unlock()
+
+               select {
+               case a.XDSUpdates <- msg:
+               default:
+               }
+       }
+}
+
+// WaitClear will clear the waiting events, so next call to Wait will get
+// the next push type.
+func (a *ADSC) WaitClear() {
+       for {
+               select {
+               case <-a.Updates:
+               default:
+                       return
+               }
+       }
+}
+
+// Dial connects to a ADS server, with optional MTLS authentication if a cert 
dir is specified.
+func (a *ADSC) Dial() error {
+       conn, err := dialWithConfig(context.Background(), &a.cfg.Config)
+       if err != nil {
+               return err
+       }
+       a.conn = conn
+       return nil
+}
+
+// Raw send of a request.
+func (a *ADSC) Send(req *discovery.DiscoveryRequest) error {
+       if a.sendNodeMeta {
+               req.Node = a.node()
+               a.sendNodeMeta = false
+       }
+       req.ResponseNonce = time.Now().String()
+       // if adscLog.DebugEnabled() {
+       //      strReq, _ := protomarshal.ToJSONWithIndent(req, "  ")
+       //      adscLog.Debugf("Sending Discovery Request to istiod: %s", 
strReq)
+       // }
+       return a.stream.Send(req)
+}
+
+// HasSynced returns true if MCP configs have synced
+func (a *ADSC) HasSynced() bool {
+       if a.cfg == nil || len(a.cfg.InitialDiscoveryRequests) == 0 {
+               return true
+       }
+
+       a.mutex.RLock()
+       defer a.mutex.RUnlock()
+
+       for _, req := range a.cfg.InitialDiscoveryRequests {
+               _, isMCP := convertTypeURLToMCPGVK(req.TypeUrl)
+               if !isMCP {
+                       continue
+               }
+
+               if _, ok := a.sync[req.TypeUrl]; !ok {
+                       return false
+               }
+       }
+
+       return true
+}
+
+// Run will create a new stream using the existing grpc client connection and 
send the initial xds requests.
+// And then it will run a go routine receiving and handling xds response.
+// Note: it is non blocking
+func (a *ADSC) Run() error {
+       var err error
+       a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
+       a.stream, err = a.client.StreamAggregatedResources(context.Background())
+       if err != nil {
+               return err
+       }
+       a.sendNodeMeta = true
+       a.initialLoad = 0
+       a.initialLds = false
+       // Send the initial requests
+       for _, r := range a.cfg.InitialDiscoveryRequests {
+               if r.TypeUrl == v3.ClusterType {
+                       a.watchTime = time.Now()
+               }
+               _ = a.Send(r)
+       }
+
+       go a.handleRecv()
+       return nil
+}
+
+// Close the stream.
+func (a *ADSC) Close() {
+       a.mutex.Lock()
+       _ = a.conn.Close()
+       a.closed = true
+       a.mutex.Unlock()
+}
+
+func setDefaultConfig(config *Config) Config {
+       if config == nil {
+               config = &Config{}
+       }
+       if config.Namespace == "" {
+               config.Namespace = "default"
+       }
+       return *config
+}
+
+func dialWithConfig(ctx context.Context, config *Config) (*grpc.ClientConn, 
error) {
+       defaultGrpcDialOptions := defaultGrpcDialOptions()
+       var grpcDialOptions []grpc.DialOption
+       grpcDialOptions = append(grpcDialOptions, defaultGrpcDialOptions...)
+       grpcDialOptions = append(grpcDialOptions, config.GrpcOpts...)
+
+       var err error
+       // If we need MTLS - CertDir or Secrets provider is set.
+       if len(config.CertDir) > 0 || config.SecretManager != nil {
+               tlsCfg, err := tlsConfig(config)
+               if err != nil {
+                       return nil, err
+               }
+               creds := credentials.NewTLS(tlsCfg)
+               grpcDialOptions = append(grpcDialOptions, 
grpc.WithTransportCredentials(creds))
+       }
+
+       if len(grpcDialOptions) == len(defaultGrpcDialOptions) {
+               // Only disable transport security if the user didn't supply 
custom dial options
+               grpcDialOptions = append(grpcDialOptions, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       }
+
+       conn, err := grpc.DialContext(ctx, config.Address, grpcDialOptions...)
+       if err != nil {
+               return nil, err
+       }
+       return conn, nil
+}
+
+func tlsConfig(config *Config) (*tls.Config, error) {
+       var serverCABytes []byte
+       var err error
+
+       getClientCertificate := getClientCertFn(config)
+
+       // Load the root CAs
+       if config.XDSRootCAFile != "" {
+               serverCABytes, err = os.ReadFile(config.XDSRootCAFile)
+               if err != nil {
+                       return nil, err
+               }
+       } else if config.SecretManager != nil {
+               // This is a bit crazy - we could just use the file
+               rootCA, err := 
config.SecretManager.GenerateSecret(security.RootCertReqResourceName)
+               if err != nil {
+                       return nil, err
+               }
+
+               serverCABytes = rootCA.RootCert
+       } else if config.CertDir != "" {
+               serverCABytes, err = os.ReadFile(config.CertDir + 
"/root-cert.pem")
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       serverCAs := x509.NewCertPool()
+       if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok {
+               return nil, err
+       }
+
+       shost, _, _ := net.SplitHostPort(config.Address)
+       if config.XDSSAN != "" {
+               shost = config.XDSSAN
+       }
+
+       // nolint: gosec
+       // it's insecure only when a user explicitly enable insecure mode.
+       return &tls.Config{
+               GetClientCertificate: getClientCertificate,
+               RootCAs:              serverCAs,
+               ServerName:           shost,
+               InsecureSkipVerify:   config.InsecureSkipVerify,
+       }, nil
+}
+
+func ConfigInitialRequests() []*discovery.DiscoveryRequest {
+       out := make([]*discovery.DiscoveryRequest, 0, 
len(collections.Sail.All())+1)
+       out = append(out, &discovery.DiscoveryRequest{
+               TypeUrl: gvk.MeshConfig.String(),
+       })
+       for _, sch := range collections.Sail.All() {
+               out = append(out, &discovery.DiscoveryRequest{
+                       TypeUrl: sch.GroupVersionKind().String(),
+               })
+       }
+
+       return out
+}
+
+// reconnect will create a new stream
+func (a *ADSC) reconnect() {
+       a.mutex.RLock()
+       if a.closed {
+               a.mutex.RUnlock()
+               return
+       }
+       a.mutex.RUnlock()
+
+       err := a.Run()
+       if err != nil {
+               time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
+       }
+}
+
+func (a *ADSC) ack(msg *discovery.DiscoveryResponse) {
+       var resources []string
+
+       if strings.HasPrefix(msg.TypeUrl, v3.DebugType) {
+               // If the response is for istio.io/debug or istio.io/debug/*,
+               // skip to send ACK.
+               return
+       }
+
+       if msg.TypeUrl == v3.EndpointType {
+               for c := range a.edsClusters {
+                       resources = append(resources, c)
+               }
+       }
+       if msg.TypeUrl == v3.RouteType {
+               for r := range a.routes {
+                       resources = append(resources, r)
+               }
+       }
+
+       _ = a.stream.Send(&discovery.DiscoveryRequest{
+               ResponseNonce: msg.Nonce,
+               TypeUrl:       msg.TypeUrl,
+               Node:          a.node(),
+               VersionInfo:   msg.VersionInfo,
+               ResourceNames: resources,
+       })
+}
+
+func (a *ADSC) handleMCP(groupVersionKind config.GroupVersionKind, resources 
[]*anypb.Any) {
+       // Generic - fill up the store
+       if a.Store == nil {
+               return
+       }
+
+       existingConfigs := a.Store.List(groupVersionKind, "")
+
+       received := make(map[string]*config.Config)
+       for _, rsc := range resources {
+               m := &mcp.Resource{}
+               err := rsc.UnmarshalTo(m)
+               if err != nil {
+                       klog.Errorf("Error unmarshalling received MCP config 
%v", err)
+                       continue
+               }
+               newCfg, err := a.mcpToSail(m)
+               if err != nil {
+                       klog.Errorf("Invalid data: %v (%v)", err, 
string(rsc.Value))
+                       continue
+               }
+               if newCfg == nil {
+                       continue
+               }
+               received[newCfg.Namespace+"/"+newCfg.Name] = newCfg
+
+               newCfg.GroupVersionKind = groupVersionKind
+               oldCfg := a.Store.Get(newCfg.GroupVersionKind, newCfg.Name, 
newCfg.Namespace)
+
+               if oldCfg == nil {
+                       if _, err = a.Store.Create(*newCfg); err != nil {
+                               klog.Errorf("Error adding a new resource to the 
store %v", err)
+                               continue
+                       }
+               } else if oldCfg.ResourceVersion != newCfg.ResourceVersion || 
newCfg.ResourceVersion == "" {
+                       // update the store only when resource version differs 
or unset.
+                       // newCfg.Annotations[mem.ResourceVersion] = 
newCfg.ResourceVersion
+                       newCfg.ResourceVersion = oldCfg.ResourceVersion
+                       if _, err = a.Store.Update(*newCfg); err != nil {
+                               klog.Errorf("Error updating an existing 
resource in the store %v", err)
+                               continue
+                       }
+               }
+       }
+
+       // remove deleted resources from cache
+       for _, config := range existingConfigs {
+               if _, ok := received[config.Namespace+"/"+config.Name]; !ok {
+                       if err := a.Store.Delete(config.GroupVersionKind, 
config.Name, config.Namespace, nil); err != nil {
+                               klog.Errorf("Error deleting an outdated 
resource from the store %v", err)
+                               continue
+                       }
+               }
+       }
+}
+
+func (a *ADSC) mcpToSail(m *mcp.Resource) (*config.Config, error) {
+       if m == nil || m.Metadata == nil {
+               return &config.Config{}, nil
+       }
+       c := &config.Config{
+               Meta: config.Meta{
+                       ResourceVersion: m.Metadata.Version,
+                       Labels:          m.Metadata.Labels,
+                       Annotations:     m.Metadata.Annotations,
+               },
+       }
+
+       if !config.ObjectInRevision(c, a.cfg.Revision) { // In case upstream 
does not support rev in node meta.
+               return nil, nil
+       }
+
+       if c.Meta.Annotations == nil {
+               c.Meta.Annotations = make(map[string]string)
+       }
+       nsn := strings.Split(m.Metadata.Name, "/")
+       if len(nsn) != 2 {
+               return nil, fmt.Errorf("invalid name %s", m.Metadata.Name)
+       }
+       c.Namespace = nsn[0]
+       c.Name = nsn[1]
+       var err error
+       c.CreationTimestamp = m.Metadata.CreateTime.AsTime()
+
+       pb, err := m.Body.UnmarshalNew()
+       if err != nil {
+               return nil, err
+       }
+       c.Spec = pb
+       return c, nil
+}
+
+func (a *ADSC) handleCDS(ll []*cluster.Cluster) {
+       cn := make([]string, 0, len(ll))
+       cdsSize := 0
+       edscds := map[string]*cluster.Cluster{}
+       cds := map[string]*cluster.Cluster{}
+       for _, c := range ll {
+               cdsSize += proto.Size(c)
+               switch v := c.ClusterDiscoveryType.(type) {
+               case *cluster.Cluster_Type:
+                       if v.Type != cluster.Cluster_EDS {
+                               cds[c.Name] = c
+                               continue
+                       }
+               }
+               cn = append(cn, c.Name)
+               edscds[c.Name] = c
+       }
+
+       klog.Infof("CDS: %d size=%d", len(cn), cdsSize)
+
+       if len(cn) > 0 {
+               a.sendRsc(v3.EndpointType, cn)
+       }
+
+       a.mutex.Lock()
+       defer a.mutex.Unlock()
+       a.edsClusters = edscds
+       a.clusters = cds
+
+       select {
+       case a.Updates <- v3.ClusterType:
+       default:
+       }
+}
+
+func (a *ADSC) handleEDS(eds []*endpoint.ClusterLoadAssignment) {
+       la := map[string]*endpoint.ClusterLoadAssignment{}
+       edsSize := 0
+       ep := 0
+       for _, cla := range eds {
+               edsSize += proto.Size(cla)
+               la[cla.ClusterName] = cla
+               ep += len(cla.Endpoints)
+       }
+
+       klog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep)
+       if a.initialLoad == 0 && !a.initialLds {
+               // first load - Envoy loads listeners after endpoints
+               _ = a.stream.Send(&discovery.DiscoveryRequest{
+                       Node:    a.node(),
+                       TypeUrl: v3.ListenerType,
+               })
+               a.initialLds = true
+       }
+
+       a.mutex.Lock()
+       defer a.mutex.Unlock()
+       a.eds = la
+
+       select {
+       case a.Updates <- v3.EndpointType:
+       default:
+       }
+}
+
+// nolint: staticcheck
+func (a *ADSC) handleLDS(ll []*listener.Listener) {
+       lh := map[string]*listener.Listener{}
+       lt := map[string]*listener.Listener{}
+
+       routes := []string{}
+       ldsSize := 0
+
+       for _, l := range ll {
+               ldsSize += proto.Size(l)
+
+               // The last filter is the actual destination for inbound 
listener
+               if l.ApiListener != nil {
+                       // This is an API Listener
+                       // TODO: extract VIP and RDS or cluster
+                       continue
+               }
+               fc := l.FilterChains[len(l.FilterChains)-1]
+               // Find the terminal filter
+               filter := fc.Filters[len(fc.Filters)-1]
+
+               // The actual destination will be the next to the last if the 
last filter is a passthrough filter
+               if fc.GetName() == util.PassthroughFilterChain {
+                       fc = l.FilterChains[len(l.FilterChains)-2]
+                       filter = fc.Filters[len(fc.Filters)-1]
+               }
+
+               switch filter.Name {
+               case wellknown.TCPProxy:
+                       lt[l.Name] = l
+                       config, _ := 
protomarshal.MessageToStructSlow(filter.GetTypedConfig())
+                       c := config.Fields["cluster"].GetStringValue()
+                       klog.V(2).Infof("TCP: %s -> %s", l.Name, c)
+               case wellknown.HTTPConnectionManager:
+                       lh[l.Name] = l
+
+                       // Getting from config is too painful..
+                       port := l.Address.GetSocketAddress().GetPortValue()
+                       if port == 15002 {
+                               routes = append(routes, "http_proxy")
+                       } else {
+                               routes = append(routes, fmt.Sprintf("%d", port))
+                       }
+               default:
+                       klog.Infof(protomarshal.ToJSONWithIndent(l, "  "))
+               }
+       }
+
+       klog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize)
+
+       a.mutex.Lock()
+       defer a.mutex.Unlock()
+       if len(routes) > 0 {
+               a.sendRsc(v3.RouteType, routes)
+       }
+       a.httpListeners = lh
+       a.tcpListeners = lt
+
+       select {
+       case a.Updates <- v3.ListenerType:
+       default:
+       }
+}
+
+func (a *ADSC) sendRsc(typeurl string, rsc []string) {
+       ex := a.Received[typeurl]
+       version := ""
+       nonce := ""
+       if ex != nil {
+               version = ex.VersionInfo
+               nonce = ex.Nonce
+       }
+       _ = a.stream.Send(&discovery.DiscoveryRequest{
+               ResponseNonce: nonce,
+               VersionInfo:   version,
+               Node:          a.node(),
+               TypeUrl:       typeurl,
+               ResourceNames: rsc,
+       })
+}
+
+func (a *ADSC) handleRDS(configurations []*route.RouteConfiguration) {
+       vh := 0
+       rcount := 0
+       size := 0
+
+       rds := map[string]*route.RouteConfiguration{}
+
+       for _, r := range configurations {
+               for _, h := range r.VirtualHosts {
+                       vh++
+                       for _, rt := range h.Routes {
+                               rcount++
+                               // Example: match:<prefix:"/" > 
route:<cluster:"outbound|9154||load-se-154.local" ...
+                               klog.V(2).Infof("Handle route %v, path %v, 
cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster())
+                       }
+               }
+               rds[r.Name] = r
+               size += proto.Size(r)
+       }
+       if a.initialLoad == 0 {
+               a.initialLoad = time.Since(a.watchTime)
+               klog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d", 
len(configurations), size, vh, rcount, a.initialLoad)
+       } else {
+               klog.Infof("RDS: %d size=%d vhosts=%d routes=%d", 
len(configurations), size, vh, rcount)
+       }
+
+       a.mutex.Lock()
+       a.routes = rds
+       a.mutex.Unlock()
+
+       select {
+       case a.Updates <- v3.RouteType:
+       default:
+       }
+}
diff --git a/pkg/adsc/util.go b/pkg/adsc/util.go
new file mode 100644
index 00000000..48945b87
--- /dev/null
+++ b/pkg/adsc/util.go
@@ -0,0 +1,57 @@
+package adsc
+
+import (
+       "crypto/tls"
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "strings"
+)
+
+func getClientCertFn(config *Config) func(requestInfo 
*tls.CertificateRequestInfo) (*tls.Certificate, error) {
+       if config.SecretManager != nil {
+               return func(requestInfo *tls.CertificateRequestInfo) 
(*tls.Certificate, error) {
+                       key, err := 
config.SecretManager.GenerateSecret(security.WorkloadKeyCertResourceName)
+                       if err != nil {
+                               return nil, err
+                       }
+                       clientCert, err := 
tls.X509KeyPair(key.CertificateChain, key.PrivateKey)
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &clientCert, nil
+               }
+       }
+       if config.CertDir != "" {
+               return func(requestInfo *tls.CertificateRequestInfo) 
(*tls.Certificate, error) {
+                       certName := config.CertDir + "/cert-chain.pem"
+                       clientCert, err := tls.LoadX509KeyPair(certName, 
config.CertDir+"/key.pem")
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &clientCert, nil
+               }
+       }
+
+       return nil
+}
+
+func convertTypeURLToMCPGVK(typeURL string) (config.GroupVersionKind, bool) {
+       parts := strings.SplitN(typeURL, "/", 3)
+       if len(parts) != 3 {
+               return config.GroupVersionKind{}, false
+       }
+
+       gvk := config.GroupVersionKind{
+               Group:   parts[0],
+               Version: parts[1],
+               Kind:    parts[2],
+       }
+
+       _, isMCP := collections.Sail.FindByGroupVersionKind(gvk)
+       if isMCP {
+               return gvk, true
+       }
+
+       return config.GroupVersionKind{}, false
+}
diff --git a/pkg/config/schema/collection/schemas.go 
b/pkg/config/schema/collection/schemas.go
new file mode 100644
index 00000000..bbf9e618
--- /dev/null
+++ b/pkg/config/schema/collection/schemas.go
@@ -0,0 +1,232 @@
+package collection
+
+import (
+       "fmt"
+
+       "github.com/google/go-cmp/cmp"
+       "github.com/hashicorp/go-multierror"
+       "k8s.io/apimachinery/pkg/runtime/schema"
+
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+)
+
+// Schemas contains metadata about configuration resources.
+type Schemas struct {
+       byCollection map[config.GroupVersionKind]resource.Schema
+       byAddOrder   []resource.Schema
+}
+
+// SchemasFor is a shortcut for creating Schemas. It uses MustAdd for each 
element.
+func SchemasFor(schemas ...resource.Schema) Schemas {
+       b := NewSchemasBuilder()
+       for _, s := range schemas {
+               b.MustAdd(s)
+       }
+       return b.Build()
+}
+
+// SchemasBuilder is a builder for the schemas type.
+type SchemasBuilder struct {
+       schemas Schemas
+}
+
+// NewSchemasBuilder returns a new instance of SchemasBuilder.
+func NewSchemasBuilder() *SchemasBuilder {
+       s := Schemas{
+               byCollection: make(map[config.GroupVersionKind]resource.Schema),
+       }
+
+       return &SchemasBuilder{
+               schemas: s,
+       }
+}
+
+// Add a new collection to the schemas.
+func (b *SchemasBuilder) Add(s resource.Schema) error {
+       if _, found := b.schemas.byCollection[s.GroupVersionKind()]; found {
+               return fmt.Errorf("collection already exists: %v", 
s.GroupVersionKind())
+       }
+
+       b.schemas.byCollection[s.GroupVersionKind()] = s
+       b.schemas.byAddOrder = append(b.schemas.byAddOrder, s)
+       return nil
+}
+
+// MustAdd calls Add and panics if it fails.
+func (b *SchemasBuilder) MustAdd(s resource.Schema) *SchemasBuilder {
+       if err := b.Add(s); err != nil {
+               panic(fmt.Sprintf("SchemasBuilder.MustAdd: %v", err))
+       }
+       return b
+}
+
+// Build a new schemas from this SchemasBuilder.
+func (b *SchemasBuilder) Build() Schemas {
+       s := b.schemas
+
+       // Avoid modify after Build.
+       b.schemas = Schemas{}
+
+       return s
+}
+
+// ForEach executes the given function on each contained schema, until the 
function returns true.
+func (s Schemas) ForEach(handleSchema func(resource.Schema) (done bool)) {
+       for _, schema := range s.byAddOrder {
+               if handleSchema(schema) {
+                       return
+               }
+       }
+}
+
+func (s Schemas) Union(otherSchemas Schemas) Schemas {
+       resultBuilder := NewSchemasBuilder()
+       for _, myschema := range s.All() {
+               // an error indicates the schema has already been added, which 
doesn't negatively impact intersect
+               _ = resultBuilder.Add(myschema)
+       }
+       for _, myschema := range otherSchemas.All() {
+               // an error indicates the schema has already been added, which 
doesn't negatively impact intersect
+               _ = resultBuilder.Add(myschema)
+       }
+       return resultBuilder.Build()
+}
+
+func (s Schemas) Intersect(otherSchemas Schemas) Schemas {
+       resultBuilder := NewSchemasBuilder()
+
+       schemaLookup := sets.String{}
+       for _, myschema := range s.All() {
+               schemaLookup.Insert(myschema.String())
+       }
+
+       // Only add schemas that are in both sets
+       for _, myschema := range otherSchemas.All() {
+               if schemaLookup.Contains(myschema.String()) {
+                       _ = resultBuilder.Add(myschema)
+               }
+       }
+       return resultBuilder.Build()
+}
+
+// FindByGroupVersionKind searches and returns the first schema with the given 
GVK
+func (s Schemas) FindByGroupVersionKind(gvk config.GroupVersionKind) 
(resource.Schema, bool) {
+       for _, rs := range s.byAddOrder {
+               if rs.GroupVersionKind() == gvk {
+                       return rs, true
+               }
+       }
+
+       return nil, false
+}
+
+// FindByGroupVersionAliasesKind searches and returns the first schema with 
the given GVK,
+// if not found, it will search for version aliases for the schema to see if 
there is a match.
+func (s Schemas) FindByGroupVersionAliasesKind(gvk config.GroupVersionKind) 
(resource.Schema, bool) {
+       for _, rs := range s.byAddOrder {
+               for _, va := range rs.GroupVersionAliasKinds() {
+                       if va == gvk {
+                               return rs, true
+                       }
+               }
+       }
+       return nil, false
+}
+
+// FindByGroupKind searches and returns the first schema with the given GVK, 
ignoring versions.
+// Generally it's a good idea to use FindByGroupVersionAliasesKind, which 
validates the version as well.
+// FindByGroupKind provides future proofing against versions we don't yet know 
about; given we don't know them, its risky.
+func (s Schemas) FindByGroupKind(gvk config.GroupVersionKind) 
(resource.Schema, bool) {
+       for _, rs := range s.byAddOrder {
+               if rs.Group() == gvk.Group && rs.Kind() == gvk.Kind {
+                       return rs, true
+               }
+       }
+       return nil, false
+}
+
+// FindByGroupVersionResource searches and returns the first schema with the 
given GVR
+func (s Schemas) FindByGroupVersionResource(gvr schema.GroupVersionResource) 
(resource.Schema, bool) {
+       for _, rs := range s.byAddOrder {
+               if rs.GroupVersionResource() == gvr {
+                       return rs, true
+               }
+       }
+
+       return nil, false
+}
+
+// All returns all known Schemas
+func (s Schemas) All() []resource.Schema {
+       return slices.Clone(s.byAddOrder)
+}
+
+// GroupVersionKinds returns all known GroupVersionKinds
+func (s Schemas) GroupVersionKinds() []config.GroupVersionKind {
+       res := []config.GroupVersionKind{}
+       for _, r := range s.All() {
+               res = append(res, r.GroupVersionKind())
+       }
+       return res
+}
+
+// Add creates a copy of this Schemas with the given schemas added.
+func (s Schemas) Add(toAdd ...resource.Schema) Schemas {
+       b := NewSchemasBuilder()
+
+       for _, s := range s.byAddOrder {
+               b.MustAdd(s)
+       }
+
+       for _, s := range toAdd {
+               b.MustAdd(s)
+       }
+
+       return b.Build()
+}
+
+// Remove creates a copy of this Schemas with the given schemas removed.
+func (s Schemas) Remove(toRemove ...resource.Schema) Schemas {
+       b := NewSchemasBuilder()
+
+       for _, s := range s.byAddOrder {
+               shouldAdd := true
+               for _, r := range toRemove {
+                       if r.Equal(s) {
+                               shouldAdd = false
+                               break
+                       }
+               }
+               if shouldAdd {
+                       b.MustAdd(s)
+               }
+       }
+
+       return b.Build()
+}
+
+// Kinds returns all known resource kinds.
+func (s Schemas) Kinds() []string {
+       kinds := sets.NewWithLength[string](len(s.byAddOrder))
+       for _, s := range s.byAddOrder {
+               kinds.Insert(s.Kind())
+       }
+
+       out := kinds.UnsortedList()
+       return slices.Sort(out)
+}
+
+// Validate the schemas. Returns error if there is a problem.
+func (s Schemas) Validate() (err error) {
+       for _, c := range s.byAddOrder {
+               err = multierror.Append(err, c.Validate()).ErrorOrNil()
+       }
+       return
+}
+
+func (s Schemas) Equal(o Schemas) bool {
+       return cmp.Equal(s.byAddOrder, o.byAddOrder)
+}
diff --git a/pkg/config/schema/collections/collections.go 
b/pkg/config/schema/collections/collections.go
new file mode 100644
index 00000000..d38deddd
--- /dev/null
+++ b/pkg/config/schema/collections/collections.go
@@ -0,0 +1,14 @@
+//go:build !agent
+// +build !agent
+
+package collections
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+)
+
+var (
+       Sail = collection.NewSchemasBuilder().
+               // TODO MustAdd
+               Build()
+)
diff --git a/pkg/config/schema/resource/schema.go 
b/pkg/config/schema/resource/schema.go
new file mode 100644
index 00000000..5012b662
--- /dev/null
+++ b/pkg/config/schema/resource/schema.go
@@ -0,0 +1,369 @@
+package resource
+
+import (
+       "errors"
+       "fmt"
+       "reflect"
+
+       "github.com/hashicorp/go-multierror"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/reflect/protoregistry"
+       "k8s.io/apimachinery/pkg/runtime/schema"
+
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+       "github.com/apache/dubbo-kubernetes/pkg/config/validation"
+)
+
+// Schema for a resource.
+type Schema interface {
+       fmt.Stringer
+
+       // GroupVersionKind of the resource. This is the only way to uniquely 
identify a resource.
+       GroupVersionKind() config.GroupVersionKind
+
+       // GroupVersionResource of the resource.
+       GroupVersionResource() schema.GroupVersionResource
+
+       // IsClusterScoped indicates that this resource is scoped to a 
particular namespace within a cluster.
+       IsClusterScoped() bool
+
+       // IsBuiltin indicates that this resource is builtin (not a CRD)
+       IsBuiltin() bool
+
+       // Identifier returns a unique identifier for the resource
+       Identifier() string
+
+       // Kind for this resource.
+       Kind() string
+
+       // Plural returns the plural form of the Kind.
+       Plural() string
+
+       // Group for this resource.
+       Group() string
+
+       // Version of this resource.
+       Version() string
+
+       // GroupVersionAliasKinds is the GVK of this resource,
+       // but the version is from its version aliases to perform version 
conversion.
+       GroupVersionAliasKinds() []config.GroupVersionKind
+
+       // APIVersion is a utility that returns a k8s API version string of the 
form "Group/Version".
+       APIVersion() string
+
+       // Proto returns the protocol buffer type name for this resource.
+       Proto() string
+
+       // ProtoPackage returns the golang package for the protobuf resource.
+       ProtoPackage() string
+
+       // NewInstance returns a new instance of the protocol buffer message 
for this resource.
+       NewInstance() (config.Spec, error)
+
+       // Status returns the associated status of the schema
+       Status() (config.Status, error)
+
+       // StatusKind returns the Kind of the status field. If unset, the field 
does not support status.
+       StatusKind() string
+       StatusPackage() string
+
+       // MustNewInstance calls NewInstance and panics if an error occurs.
+       MustNewInstance() config.Spec
+
+       // Validate this schema.
+       Validate() error
+
+       // ValidateConfig validates that the given config message is of the 
correct type for this schema
+       // and that the contents are valid.
+       ValidateConfig(cfg config.Config) (validation.Warning, error)
+
+       // Equal is a helper function for testing equality between Schema 
instances. This supports comparison
+       // with the cmp library.
+       Equal(other Schema) bool
+}
+
+// Builder for a Schema.
+type Builder struct {
+       // ClusterScoped is true for resource in cluster-level.
+       ClusterScoped bool
+
+       // Synthetic is true for resource that do not actually exist in a 
cluster
+       Synthetic bool
+
+       // Builtin is true for resources that are builtin (not CRD)
+       Builtin bool
+
+       // Identifier is the unique identifier for the resource
+       Identifier string
+
+       // Kind is the config proto type.
+       Kind string
+
+       // Plural is the type in plural.
+       Plural string
+
+       // Group is the config proto group.
+       Group string
+
+       // Version is the config proto version.
+       Version string
+
+       // VersionAliases is the config proto version aliases.
+       VersionAliases []string
+
+       // Proto refers to the protobuf message type name corresponding to the 
type
+       Proto string
+
+       StatusProto string
+
+       // ReflectType is the type of the go struct
+       ReflectType reflect.Type
+
+       // StatusType is the type of the associated status.
+       StatusType reflect.Type
+
+       // ProtoPackage refers to the name of golang package for the protobuf 
message.
+       ProtoPackage string
+
+       // StatusPackage refers to the name of the golang status package.
+       StatusPackage string
+
+       // ValidateProto performs validation on protobuf messages based on this 
schema.
+       ValidateProto validation.ValidateFunc
+}
+
+// Build a Schema instance.
+func (b Builder) Build() (Schema, error) {
+       s := b.BuildNoValidate()
+
+       // Validate the schema.
+       if err := s.Validate(); err != nil {
+               return nil, err
+       }
+
+       return s, nil
+}
+
+// MustBuild calls Build and panics if it fails.
+func (b Builder) MustBuild() Schema {
+       s, err := b.Build()
+       if err != nil {
+               panic(fmt.Sprintf("MustBuild: %v", err))
+       }
+       return s
+}
+
+// BuildNoValidate builds the Schema without checking the fields.
+func (b Builder) BuildNoValidate() Schema {
+       if b.ValidateProto == nil {
+               b.ValidateProto = validation.EmptyValidate
+       }
+
+       return &schemaImpl{
+               clusterScoped: b.ClusterScoped,
+               synthetic:     b.Synthetic,
+               builtin:       b.Builtin,
+               gvk: config.GroupVersionKind{
+                       Group:   b.Group,
+                       Version: b.Version,
+                       Kind:    b.Kind,
+               },
+               plural:         b.Plural,
+               apiVersion:     b.Group + "/" + b.Version,
+               versionAliases: b.VersionAliases,
+               proto:          b.Proto,
+               goPackage:      b.ProtoPackage,
+               identifier:     b.Identifier,
+               reflectType:    b.ReflectType,
+               validateConfig: b.ValidateProto,
+               statusType:     b.StatusType,
+               statusPackage:  b.StatusPackage,
+       }
+}
+
+type schemaImpl struct {
+       clusterScoped  bool
+       builtin        bool
+       gvk            config.GroupVersionKind
+       versionAliases []string
+       plural         string
+       apiVersion     string
+       proto          string
+       goPackage      string
+       validateConfig validation.ValidateFunc
+       reflectType    reflect.Type
+       statusType     reflect.Type
+       statusPackage  string
+       identifier     string
+       synthetic      bool
+}
+
+func (s *schemaImpl) GroupVersionKind() config.GroupVersionKind {
+       return s.gvk
+}
+
+func (s *schemaImpl) GroupVersionResource() schema.GroupVersionResource {
+       return schema.GroupVersionResource{
+               Group:    s.Group(),
+               Version:  s.Version(),
+               Resource: s.Plural(),
+       }
+}
+
+func (s *schemaImpl) IsClusterScoped() bool {
+       return s.clusterScoped
+}
+
+func (s *schemaImpl) IsBuiltin() bool {
+       return s.builtin
+}
+
+func (s *schemaImpl) Identifier() string {
+       return s.identifier
+}
+
+func (s *schemaImpl) Kind() string {
+       return s.gvk.Kind
+}
+
+func (s *schemaImpl) Plural() string {
+       return s.plural
+}
+
+func (s *schemaImpl) Group() string {
+       return s.gvk.Group
+}
+
+func (s *schemaImpl) Version() string {
+       return s.gvk.Version
+}
+
+func (s *schemaImpl) GroupVersionAliasKinds() []config.GroupVersionKind {
+       gvks := make([]config.GroupVersionKind, len(s.versionAliases))
+       for i, va := range s.versionAliases {
+               gvks[i] = s.gvk
+               gvks[i].Version = va
+       }
+       gvks = append(gvks, s.GroupVersionKind())
+       return gvks
+}
+
+func (s *schemaImpl) APIVersion() string {
+       return s.apiVersion
+}
+
+func (s *schemaImpl) Proto() string {
+       return s.proto
+}
+
+func (s *schemaImpl) ProtoPackage() string {
+       return s.goPackage
+}
+
+func (s *schemaImpl) StatusPackage() string {
+       return s.statusPackage
+}
+
+func (s *schemaImpl) Validate() (err error) {
+       if !labels.IsDNS1123Label(s.Kind()) {
+               err = multierror.Append(err, fmt.Errorf("invalid kind: %s", 
s.Kind()))
+       }
+       if !labels.IsDNS1123Label(s.plural) {
+               err = multierror.Append(err, fmt.Errorf("invalid plural for 
kind %s: %s", s.Kind(), s.plural))
+       }
+       if s.reflectType == nil && getProtoMessageType(s.proto) == nil {
+               err = multierror.Append(err, fmt.Errorf("proto message or 
reflect type not found: %v", s.proto))
+       }
+       return
+}
+
+func (s *schemaImpl) String() string {
+       return fmt.Sprintf("[Schema](%s, %q, %s)", s.Kind(), s.goPackage, 
s.proto)
+}
+
+func (s *schemaImpl) NewInstance() (config.Spec, error) {
+       rt := s.reflectType
+       var instance any
+       if rt == nil {
+               // Use proto
+               t, err := protoMessageType(protoreflect.FullName(s.proto))
+               if err != nil || t == nil {
+                       return nil, errors.New("failed to find reflect type")
+               }
+               instance = t.New().Interface()
+       } else {
+               instance = reflect.New(rt).Interface()
+       }
+
+       p, ok := instance.(config.Spec)
+       if !ok {
+               return nil, fmt.Errorf(
+                       "newInstance: message is not an instance of 
config.Spec. kind:%s, type:%v, value:%v",
+                       s.Kind(), rt, instance)
+       }
+       return p, nil
+}
+
+func (s *schemaImpl) Status() (config.Status, error) {
+       statTyp := s.statusType
+       if statTyp == nil {
+               return nil, errors.New("unknown status type")
+       }
+       instance := reflect.New(statTyp).Interface()
+       p, ok := instance.(config.Status)
+       if !ok {
+               return nil, fmt.Errorf("status: statusType not an instance of 
config.Status. type: %v, value: %v", statTyp, instance)
+       }
+       return p, nil
+}
+
+func (s *schemaImpl) StatusKind() string {
+       if s.statusType == nil {
+               return ""
+       }
+       return s.statusType.Name()
+}
+
+func (s *schemaImpl) MustNewInstance() config.Spec {
+       p, err := s.NewInstance()
+       if err != nil {
+               panic(err)
+       }
+       return p
+}
+
+func (s *schemaImpl) ValidateConfig(cfg config.Config) (validation.Warning, 
error) {
+       return s.validateConfig(cfg)
+}
+
+func (s *schemaImpl) Equal(o Schema) bool {
+       return s.IsClusterScoped() == o.IsClusterScoped() &&
+               s.Kind() == o.Kind() &&
+               s.Plural() == o.Plural() &&
+               s.Group() == o.Group() &&
+               s.Version() == o.Version() &&
+               s.Proto() == o.Proto() &&
+               s.ProtoPackage() == o.ProtoPackage()
+}
+
+// FromKubernetesGVK converts a Kubernetes GVK to an Istio GVK
+func FromKubernetesGVK(in *schema.GroupVersionKind) config.GroupVersionKind {
+       return config.GroupVersionKind{
+               Group:   in.Group,
+               Version: in.Version,
+               Kind:    in.Kind,
+       }
+}
+
+// getProtoMessageType returns the Go lang type of the proto with the 
specified name.
+func getProtoMessageType(protoMessageName string) reflect.Type {
+       t, err := protoMessageType(protoreflect.FullName(protoMessageName))
+       if err != nil || t == nil {
+               return nil
+       }
+       return reflect.TypeOf(t.Zero().Interface())
+}
+
+var protoMessageType = protoregistry.GlobalTypes.FindMessageByName
diff --git a/pkg/config/validation/validation.go 
b/pkg/config/validation/validation.go
index 2aba93ca..8fb7ea7c 100644
--- a/pkg/config/validation/validation.go
+++ b/pkg/config/validation/validation.go
@@ -17,8 +17,21 @@
 
 package validation
 
-import "github.com/apache/dubbo-kubernetes/pkg/config/validation/agent"
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/validation/agent"
+)
+
+var (
+       // EmptyValidate is a Validate that does nothing and returns no error.
+       EmptyValidate = func(config.Config) (Warning, error) {
+               return nil, nil
+       }
+)
 
 type (
        Warning = agent.Warning
 )
+
+// ValidateFunc defines a validation func for an API proto.
+type ValidateFunc func(config config.Config) (Warning, error)
diff --git a/pkg/kube/collection/collection.go b/pkg/kube/collection/schemas.go
similarity index 100%
rename from pkg/kube/collection/collection.go
rename to pkg/kube/collection/schemas.go
diff --git a/pkg/kube/krt/core.go b/pkg/kube/krt/core.go
index 4dfa676f..37779049 100644
--- a/pkg/kube/krt/core.go
+++ b/pkg/kube/krt/core.go
@@ -128,3 +128,9 @@ type internalCollection[T any] interface {
        // Create a new index into the collection
        index(name string, extract func(o T) []string) indexer[T]
 }
+
+// Namespacer is an optional interface that can be implemented by collection 
types.
+// If implemented, this will be used to determine an objects' Namespace.
+type Namespacer interface {
+       GetNamespace() string
+}
diff --git a/pkg/kube/krt/debug.go b/pkg/kube/krt/debug.go
index e8a62eeb..3ba58688 100644
--- a/pkg/kube/krt/debug.go
+++ b/pkg/kube/krt/debug.go
@@ -72,3 +72,18 @@ func eraseMap[T any](l map[Key[T]]T) map[string]any {
        }
        return nm
 }
+
+// maybeRegisterCollectionForDebugging registers the collection in the 
debugger, if one is enabled
+func maybeRegisterCollectionForDebugging[T any](c Collection[T], handler 
*DebugHandler) {
+       if handler == nil {
+               return
+       }
+       cc := c.(internalCollection[T])
+       handler.mu.Lock()
+       defer handler.mu.Unlock()
+       handler.debugCollections = append(handler.debugCollections, 
DebugCollection{
+               name: cc.name(),
+               dump: cc.dump,
+               uid:  cc.uid(),
+       })
+}
diff --git a/pkg/kube/krt/files/files.go b/pkg/kube/krt/files/files.go
index 9d222dcd..9f4e6d3e 100644
--- a/pkg/kube/krt/files/files.go
+++ b/pkg/kube/krt/files/files.go
@@ -21,7 +21,14 @@ import (
        "fmt"
        "github.com/apache/dubbo-kubernetes/pkg/filewatcher"
        "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+       "github.com/fsnotify/fsnotify"
        "go.uber.org/atomic"
+       "k8s.io/klog/v2"
+       "os"
+       "path/filepath"
+       "sync"
        "time"
 )
 
@@ -29,12 +36,7 @@ type FileSingleton[T any] struct {
        krt.Singleton[T]
 }
 
-func NewFileSingleton[T any](
-       fileWatcher filewatcher.FileWatcher,
-       filename string,
-       readFile func(filename string) (T, error),
-       opts ...krt.CollectionOption,
-) (FileSingleton[T], error) {
+func NewFileSingleton[T any](fileWatcher filewatcher.FileWatcher, filename 
string, readFile func(filename string) (T, error), opts 
...krt.CollectionOption) (FileSingleton[T], error) {
        cfg, err := readFile(filename)
        if err != nil {
                return FileSingleton[T]{}, err
@@ -61,6 +63,26 @@ func NewFileSingleton[T any](
        return FileSingleton[T]{sc}, nil
 }
 
+type FileCollection[T any] struct {
+       krt.StaticCollection[T]
+       read func() []T
+}
+
+func NewFileCollection[F any, O any](w *FolderWatch[F], transform func(F) *O, 
opts ...krt.CollectionOption) FileCollection[O] {
+       res := FileCollection[O]{
+               read: func() []O {
+                       return readSnapshot[F, O](w, transform)
+               },
+       }
+       sc := krt.NewStaticCollection[O](nil, res.read(), opts...)
+       w.subscribe(func() {
+               now := res.read()
+               sc.Reset(now)
+       })
+       res.StaticCollection = sc
+       return res
+}
+
 func watchFile(fileWatcher filewatcher.FileWatcher, file string, stop <-chan 
struct{}, callback func()) {
        _ = fileWatcher.Add(file)
        go func() {
@@ -80,3 +102,168 @@ func watchFile(fileWatcher filewatcher.FileWatcher, file 
string, stop <-chan str
                }
        }()
 }
+
+var supportedExtensions = sets.New(".yaml", ".yml")
+
+const watchDebounceDelay = 50 * time.Millisecond
+
+type FolderWatch[T any] struct {
+       root  string
+       parse func([]byte) ([]T, error)
+
+       mu        sync.RWMutex
+       state     []T
+       callbacks []func()
+}
+
+func NewFolderWatch[T any](fileDir string, parse func([]byte) ([]T, error), 
stop <-chan struct{}) (*FolderWatch[T], error) {
+       fw := &FolderWatch[T]{root: fileDir, parse: parse}
+       // Read initial state
+       if err := fw.readOnce(); err != nil {
+               return nil, err
+       }
+       fw.watch(stop)
+       return fw, nil
+}
+
+func (f *FolderWatch[T]) readOnce() error {
+       var result []T
+
+       err := filepath.Walk(f.root, func(path string, info os.FileInfo, err 
error) error {
+               if err != nil {
+                       return err
+               } else if !supportedExtensions.Contains(filepath.Ext(path)) || 
(info.Mode()&os.ModeType) != 0 {
+                       return nil
+               }
+               data, err := os.ReadFile(path)
+               if err != nil {
+                       klog.Errorf("Failed to readOnce %s: %v", path, err)
+                       return err
+               }
+               parsed, err := f.parse(data)
+               if err != nil {
+                       klog.Errorf("Failed to parse %s: %v", path, err)
+                       return err
+               }
+               result = append(result, parsed...)
+               return nil
+       })
+       if err != nil {
+               klog.Errorf("failure during filepath.Walk: %v", err)
+       }
+
+       if err != nil {
+               return err
+       }
+
+       f.mu.Lock()
+       f.state = result
+       cb := slices.Clone(f.callbacks)
+       f.mu.Unlock()
+       for _, c := range cb {
+               c()
+       }
+       return nil
+}
+
+func (f *FolderWatch[T]) watch(stop <-chan struct{}) {
+       c := make(chan struct{}, 1)
+       if err := f.fileTrigger(c, stop); err != nil {
+               klog.Errorf("Unable to setup FileTrigger for %s: %v", f.root, 
err)
+               return
+       }
+       // Run the close loop asynchronously.
+       go func() {
+               for {
+                       select {
+                       case <-c:
+                               klog.Infof("Triggering reload of file 
configuration")
+                               if err := f.readOnce(); err != nil {
+                                       klog.Errorf("unable to reload file 
configuration %v: %v", f.root, err)
+                               }
+                       case <-stop:
+                               return
+                       }
+               }
+       }()
+}
+
+func (f *FolderWatch[T]) get() []T {
+       f.mu.RLock()
+       defer f.mu.RUnlock()
+       return f.state
+}
+
+func (f *FolderWatch[T]) subscribe(fn func()) {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       f.callbacks = append(f.callbacks, fn)
+}
+
+func (f *FolderWatch[T]) fileTrigger(events chan struct{}, stop <-chan 
struct{}) error {
+       fs, err := fsnotify.NewWatcher()
+       if err != nil {
+               return err
+       }
+       watcher := recursiveWatcher{fs}
+       if err = watcher.watchRecursive(f.root); err != nil {
+               return err
+       }
+       go func() {
+               defer watcher.Close()
+               var debounceC <-chan time.Time
+               for {
+                       select {
+                       case <-debounceC:
+                               debounceC = nil
+                               events <- struct{}{}
+                       case e := <-watcher.Events:
+                               s, err := os.Stat(e.Name)
+                               if err == nil && s != nil && s.IsDir() {
+                                       // If it's a directory, add a watch for 
it so we see nested files.
+                                       if e.Op&fsnotify.Create != 0 {
+                                               klog.V(2).Infof("add watch for 
%v: %v", s.Name(), watcher.watchRecursive(e.Name))
+                                       }
+                               }
+                               // Can't stat a deleted directory, so attempt 
to remove it. If it fails it is not a problem
+                               if e.Op&fsnotify.Remove != 0 {
+                                       _ = watcher.Remove(e.Name)
+                               }
+                               if debounceC == nil {
+                                       debounceC = 
time.After(watchDebounceDelay)
+                               }
+                       case err := <-watcher.Errors:
+                               klog.Errorf("Error watching file trigger: %v 
%v", f.root, err)
+                               return
+                       case signal := <-stop:
+                               klog.Infof("Shutting down file watcher: %v %v", 
f.root, signal)
+                               return
+                       }
+               }
+       }()
+       return nil
+}
+
+func readSnapshot[F any, O any](w *FolderWatch[F], transform func(F) *O) []O {
+       res := w.get()
+       return slices.MapFilter(res, transform)
+}
+
+type recursiveWatcher struct {
+       *fsnotify.Watcher
+}
+
+func (m recursiveWatcher) watchRecursive(path string) error {
+       err := filepath.Walk(path, func(walkPath string, fi os.FileInfo, err 
error) error {
+               if err != nil {
+                       return err
+               }
+               if fi.IsDir() {
+                       if err = m.Watcher.Add(walkPath); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       })
+       return err
+}
diff --git a/pkg/kube/krt/index.go b/pkg/kube/krt/index.go
index 5d873ea7..e94b355c 100644
--- a/pkg/kube/krt/index.go
+++ b/pkg/kube/krt/index.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/ptr"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+       "k8s.io/client-go/tools/cache"
 )
 
 type Index[K comparable, O any] interface {
@@ -42,6 +43,13 @@ func (i IndexObject[K, O]) ResourceName() string {
        return toString(i.Key)
 }
 
+// NewNamespaceIndex is a small helper to index a collection by namespace
+func NewNamespaceIndex[O Namespacer](c Collection[O]) Index[string, O] {
+       return NewIndex(c, cache.NamespaceIndex, func(o O) []string {
+               return []string{o.GetNamespace()}
+       })
+}
+
 func NewIndex[K comparable, O any](
        c Collection[O],
        name string,
diff --git a/pkg/kube/krt/static.go b/pkg/kube/krt/static.go
new file mode 100644
index 00000000..a0b386be
--- /dev/null
+++ b/pkg/kube/krt/static.go
@@ -0,0 +1,251 @@
+package krt
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/maps"
+       "github.com/apache/dubbo-kubernetes/pkg/ptr"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+       "sync"
+)
+
+type StaticCollection[T any] struct {
+       *staticList[T]
+}
+
+type staticList[T any] struct {
+       mu             sync.RWMutex
+       vals           map[string]T
+       eventHandlers  *handlerSet[T]
+       id             collectionUID
+       stop           <-chan struct{}
+       collectionName string
+       syncer         Syncer
+       metadata       Metadata
+       indexes        map[string]staticListIndex[T]
+}
+
+func (s *staticList[T]) Register(f func(o Event[T])) HandlerRegistration {
+       return registerHandlerAsBatched(s, f)
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) name() string {
+       return s.collectionName
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) uid() collectionUID {
+       return s.id
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) dump() CollectionDump {
+       return CollectionDump{
+               Outputs: eraseMap(slices.GroupUnique(s.List(), getTypedKey)),
+               Synced:  s.HasSynced(),
+       }
+}
+
+func (s *staticList[T]) index(name string, extract func(o T) []string) 
indexer[T] {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       if idx, ok := s.indexes[name]; ok {
+               return idx
+       }
+
+       idx := staticListIndex[T]{
+               extract: extract,
+               index:   make(map[string]sets.Set[string]),
+               parent:  s,
+       }
+
+       for k, v := range s.vals {
+               idx.update(Event[T]{
+                       Old:   nil,
+                       New:   &v,
+                       Event: controllers.EventAdd,
+               }, k)
+       }
+       s.indexes[name] = idx
+
+       return idx
+}
+
+// nolint: unused // (not true, its to implement an interface)
+func (s *staticList[T]) augment(a any) any {
+       return a
+}
+
+func (s *staticList[T]) HasSynced() bool {
+       return s.syncer.HasSynced()
+}
+
+func (s *staticList[T]) Synced() Syncer {
+       // We are always synced in the static collection since the initial 
state must be provided upfront
+       return alwaysSynced{}
+}
+
+func (s *staticList[T]) GetKey(k string) *T {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       if o, f := s.vals[k]; f {
+               return &o
+       }
+       return nil
+}
+
+func (s *staticList[T]) Metadata() Metadata {
+       return s.metadata
+}
+
+func (s *staticList[T]) WaitUntilSynced(stop <-chan struct{}) bool {
+       return s.syncer.WaitUntilSynced(stop)
+}
+
+func (s *staticList[T]) RegisterBatch(f func(o []Event[T]), runExistingState 
bool) HandlerRegistration {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       var objs []Event[T]
+       if runExistingState {
+               for _, v := range s.vals {
+                       objs = append(objs, Event[T]{
+                               New:   &v,
+                               Event: controllers.EventAdd,
+                       })
+               }
+       }
+       return s.eventHandlers.Insert(f, s.Synced(), objs, s.stop)
+}
+
+func (s *staticList[T]) List() []T {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       return maps.Values(s.vals)
+}
+
+func NewStaticCollection[T any](synced Syncer, vals []T, opts 
...CollectionOption) StaticCollection[T] {
+       o := buildCollectionOptions(opts...)
+       if o.name == "" {
+               o.name = fmt.Sprintf("Static[%v]", ptr.TypeName[T]())
+       }
+
+       res := make(map[string]T, len(vals))
+       for _, v := range vals {
+               res[GetKey(v)] = v
+       }
+
+       if synced == nil {
+               synced = alwaysSynced{}
+       }
+
+       sl := &staticList[T]{
+               eventHandlers:  newHandlerSet[T](),
+               vals:           res,
+               id:             nextUID(),
+               stop:           o.stop,
+               collectionName: o.name,
+               syncer:         synced,
+               indexes:        make(map[string]staticListIndex[T]),
+       }
+
+       if o.metadata != nil {
+               sl.metadata = o.metadata
+       }
+
+       c := StaticCollection[T]{
+               staticList: sl,
+       }
+       maybeRegisterCollectionForDebugging[T](c, o.debugger)
+       return c
+}
+
+func (s StaticCollection[T]) Reset(newState []T) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       var updates []Event[T]
+       nv := map[string]T{}
+       for _, incoming := range newState {
+               k := GetKey(incoming)
+               nv[k] = incoming
+               if old, f := s.vals[k]; f {
+                       if !Equal(old, incoming) {
+                               ev := Event[T]{
+                                       Old:   &old,
+                                       New:   &incoming,
+                                       Event: controllers.EventUpdate,
+                               }
+                               for _, index := range s.indexes {
+                                       index.update(ev, k)
+                               }
+                               updates = append(updates, ev)
+                       }
+               } else {
+                       ev := Event[T]{
+                               New:   &incoming,
+                               Event: controllers.EventAdd,
+                       }
+                       for _, index := range s.indexes {
+                               index.update(ev, k)
+                       }
+                       updates = append(updates, ev)
+               }
+               delete(s.vals, k)
+       }
+       for k, remaining := range s.vals {
+               for _, index := range s.indexes {
+                       index.delete(remaining, k)
+               }
+               updates = append(updates, Event[T]{
+                       Old:   &remaining,
+                       Event: controllers.EventDelete,
+               })
+       }
+       s.vals = nv
+       if len(updates) > 0 {
+               s.eventHandlers.Distribute(updates, false)
+       }
+}
+
+// nolint: unused // (not true)
+type staticListIndex[T any] struct {
+       extract func(o T) []string
+       index   map[string]sets.Set[string]
+       parent  *staticList[T]
+}
+
+func (s staticListIndex[T]) update(ev Event[T], oKey string) {
+       if ev.Old != nil {
+               s.delete(*ev.Old, oKey)
+       }
+       if ev.New != nil {
+               newIndexKeys := s.extract(*ev.New)
+               for _, newIndexKey := range newIndexKeys {
+                       sets.InsertOrNew(s.index, newIndexKey, oKey)
+               }
+       }
+}
+
+func (s staticListIndex[T]) delete(o T, oKey string) {
+       oldIndexKeys := s.extract(o)
+       for _, oldIndexKey := range oldIndexKeys {
+               sets.DeleteCleanupLast(s.index, oldIndexKey, oKey)
+       }
+}
+
+func (s staticListIndex[T]) Lookup(key string) []T {
+       s.parent.mu.RLock()
+       defer s.parent.mu.RUnlock()
+       keys := s.index[key]
+
+       res := make([]T, 0, len(keys))
+       for k := range keys {
+               v, f := s.parent.vals[k]
+               if !f {
+                       continue
+               }
+               res = append(res, v)
+       }
+       return res
+}
diff --git a/pkg/kube/krt/sync.go b/pkg/kube/krt/sync.go
index acf75dba..f5d3b0f3 100644
--- a/pkg/kube/krt/sync.go
+++ b/pkg/kube/krt/sync.go
@@ -82,3 +82,13 @@ func (c multiSyncer) HasSynced() bool {
        }
        return true
 }
+
+type alwaysSynced struct{}
+
+func (c alwaysSynced) WaitUntilSynced(stop <-chan struct{}) bool {
+       return true
+}
+
+func (c alwaysSynced) HasSynced() bool {
+       return true
+}
diff --git a/pkg/model/xds.go b/pkg/model/xds.go
new file mode 100644
index 00000000..c5eccbf1
--- /dev/null
+++ b/pkg/model/xds.go
@@ -0,0 +1,11 @@
+package model
+
+const (
+       APITypePrefix = "type.googleapis.com/"
+       ClusterType   = APITypePrefix + "envoy.config.cluster.v3.Cluster"
+       ListenerType  = APITypePrefix + "envoy.config.listener.v3.Listener"
+       EndpointType  = APITypePrefix + 
"envoy.config.endpoint.v3.ClusterLoadAssignment"
+       RouteType     = APITypePrefix + 
"envoy.config.route.v3.RouteConfiguration"
+
+       DebugType = "dubbo.io/debug"
+)
diff --git a/pkg/security/security.go b/pkg/security/security.go
index cbeafbd6..84ab7b62 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -20,6 +20,15 @@ package security
 import (
        "context"
        "net/http"
+       "time"
+)
+
+const (
+       // RootCertReqResourceName is resource name of discovery request for 
root certificate.
+       RootCertReqResourceName = "ROOTCA"
+       // WorkloadKeyCertResourceName is the resource name of the discovery 
request for workload
+       // identity.
+       WorkloadKeyCertResourceName = "default"
 )
 
 type AuthContext struct {
@@ -34,6 +43,33 @@ type Authenticator interface {
        AuthenticatorType() string
 }
 
+// SecretItem is the cached item in in-memory secret store.
+type SecretItem struct {
+       CertificateChain []byte
+       PrivateKey       []byte
+
+       RootCert []byte
+
+       // ResourceName passed from envoy SDS discovery request.
+       // "ROOTCA" for root cert request, "default" for key/cert request.
+       ResourceName string
+
+       CreatedTime time.Time
+
+       ExpireTime time.Time
+}
+
+// SecretManager defines secrets management interface which is used by SDS.
+type SecretManager interface {
+       // GenerateSecret generates new secret for the given resource.
+       //
+       // The current implementation also watched the generated secret and 
trigger a callback when it is
+       // near expiry. It will constructs the SAN based on the token's 'sub' 
claim, expected to be in
+       // the K8S format. No other JWTs are currently supported due to client 
logic. If JWT is
+       // missing/invalid, the resourceName is used.
+       GenerateSecret(resourceName string) (*SecretItem, error)
+}
+
 type AuthSource int
 
 type KubernetesInfo struct {
diff --git a/pkg/wellknown/wellknown.go b/pkg/wellknown/wellknown.go
new file mode 100644
index 00000000..12d50e16
--- /dev/null
+++ b/pkg/wellknown/wellknown.go
@@ -0,0 +1,13 @@
+package wellknown
+
+// Package wellknown contains common names for filters, listeners, etc.
+// copied from envoyproxy/go-control-plane.
+// TODO: remove this package
+
+// Network filter names
+const (
+       // HTTPConnectionManager network filter
+       HTTPConnectionManager = "envoy.filters.network.http_connection_manager"
+       // TCPProxy network filter
+       TCPProxy = "envoy.filters.network.tcp_proxy"
+)
diff --git a/sail/cmd/sail-discovery/app/cmd.go 
b/sail/cmd/sail-discovery/app/cmd.go
index e4ff4834..09e29571 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -24,7 +24,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/ctrlz"
        "github.com/apache/dubbo-kubernetes/sail/pkg/bootstrap"
        "github.com/apache/dubbo-kubernetes/sail/pkg/features"
-       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/providers"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
        "github.com/spf13/cobra"
 )
 
@@ -88,9 +88,9 @@ func addFlags(c *cobra.Command) {
                p.CtrlZOptions = ctrlz.DefaultOptions()
        })
        
c.PersistentFlags().StringSliceVar(&serverArgs.RegistryOptions.Registries, 
"registries",
-               []string{string(providers.Kubernetes)},
+               []string{string(provider.Kubernetes)},
                fmt.Sprintf("Comma separated list of platform service 
registries to read from (choose one or more from {%s})",
-                       providers.Kubernetes))
+                       provider.Kubernetes))
        
c.PersistentFlags().StringVar(&serverArgs.RegistryOptions.ClusterRegistriesNamespace,
 "clusterRegistriesNamespace",
                serverArgs.RegistryOptions.ClusterRegistriesNamespace, 
"Namespace for ConfigMap which stores clusters configs")
        c.PersistentFlags().StringVar(&serverArgs.RegistryOptions.KubeConfig, 
"kubeconfig", "",
diff --git a/sail/pkg/bootstrap/configcontroller.go 
b/sail/pkg/bootstrap/configcontroller.go
index 0ab23e87..af68ff89 100644
--- a/sail/pkg/bootstrap/configcontroller.go
+++ b/sail/pkg/bootstrap/configcontroller.go
@@ -17,8 +17,265 @@
 
 package bootstrap
 
+import (
+       "context"
+       "crypto/tls"
+       "crypto/x509"
+       "encoding/pem"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/adsc"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+       configaggregate 
"github.com/apache/dubbo-kubernetes/sail/pkg/config/aggregate"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/file"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/config/memory"
+       dubboCredentials 
"github.com/apache/dubbo-kubernetes/sail/pkg/credentials"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/credentials/kube"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+       "istio.io/api/networking/v1alpha3"
+       v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/klog/v2"
+       "net/url"
+       "strings"
+)
+
 type ConfigSourceAddressScheme string
 
 const (
+       File       ConfigSourceAddressScheme = "fs"
+       XDS        ConfigSourceAddressScheme = "xds"
        Kubernetes ConfigSourceAddressScheme = "k8s"
 )
+
+func (s *Server) initConfigController(args *SailArgs) error {
+       meshConfig := s.environment.Mesh()
+       if len(meshConfig.ConfigSources) > 0 {
+               // Using MCP for config.
+               if err := s.initConfigSources(args); err != nil {
+                       return err
+               }
+       } else if args.RegistryOptions.FileDir != "" {
+               // Local files - should be added even if other options are 
specified
+               configController, err := file.NewController(
+                       args.RegistryOptions.FileDir,
+                       args.RegistryOptions.KubeOptions.DomainSuffix,
+                       collections.Sail,
+                       args.RegistryOptions.KubeOptions,
+               )
+               if err != nil {
+                       return err
+               }
+               s.ConfigStores = append(s.ConfigStores, configController)
+       } else {
+               err := s.initK8SConfigStore(args)
+               if err != nil {
+                       return err
+               }
+       }
+
+       // TODO ingress controller
+       // TODO addTerminatingStartFunc
+
+       // Wrap the config controller with a cache.
+       aggregateConfigController, err := 
configaggregate.MakeCache(s.ConfigStores)
+       if err != nil {
+               return err
+       }
+       s.configController = aggregateConfigController
+
+       // Create the config store.
+       s.environment.ConfigStore = aggregateConfigController
+
+       // Defer starting the controller until after the service is created.
+       s.addStartFunc("config controller", func(stop <-chan struct{}) error {
+               go s.configController.Run(stop)
+               return nil
+       })
+
+       return nil
+}
+
+// initConfigSources will process mesh config 'configSources' and initialize
+// associated configs.
+func (s *Server) initConfigSources(args *SailArgs) (err error) {
+       for _, configSource := range s.environment.Mesh().ConfigSources {
+               srcAddress, err := url.Parse(configSource.Address)
+               if err != nil {
+                       return fmt.Errorf("invalid config URL %s %v", 
configSource.Address, err)
+               }
+               scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
+               switch scheme {
+               case File:
+                       if srcAddress.Path == "" {
+                               return fmt.Errorf("invalid fs config URL %s, 
contains no file path", configSource.Address)
+                       }
+
+                       configController, err := file.NewController(
+                               srcAddress.Path,
+                               args.RegistryOptions.KubeOptions.DomainSuffix,
+                               collections.Sail,
+                               args.RegistryOptions.KubeOptions,
+                       )
+                       if err != nil {
+                               return err
+                       }
+                       s.ConfigStores = append(s.ConfigStores, 
configController)
+                       klog.Infof("Started File configSource %s", 
configSource.Address)
+               case XDS:
+                       transportCredentials, err := 
s.getTransportCredentials(args, configSource.TlsSettings)
+                       if err != nil {
+                               return fmt.Errorf("failed to read transport 
credentials from config: %v", err)
+                       }
+                       xdsMCP, err := adsc.New(srcAddress.Host, 
&adsc.ADSConfig{
+                               InitialDiscoveryRequests: 
adsc.ConfigInitialRequests(),
+                               Config: adsc.Config{
+                                       Namespace: args.Namespace,
+                                       Workload:  args.PodName,
+                                       Revision:  "", // TODO
+                                       Meta:      nil,
+                                       GrpcOpts: []grpc.DialOption{
+                                               
args.KeepaliveOptions.ConvertToClientOption(),
+                                               
grpc.WithTransportCredentials(transportCredentials),
+                                       },
+                               },
+                       })
+                       if err != nil {
+                               return fmt.Errorf("failed to dial XDS %s %v", 
configSource.Address, err)
+                       }
+                       store := memory.Make(collections.Sail)
+                       // TODO: enable namespace filter for memory controller
+                       configController := memory.NewController(store)
+                       
configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
+                       xdsMCP.Store = configController
+                       err = xdsMCP.Run()
+                       if err != nil {
+                               return fmt.Errorf("MCP: failed running %v", err)
+                       }
+                       s.ConfigStores = append(s.ConfigStores, 
configController)
+                       klog.Infof("Started XDS configSource %s", 
configSource.Address)
+               case Kubernetes:
+                       if srcAddress.Path == "" || srcAddress.Path == "/" {
+                               err2 := s.initK8SConfigStore(args)
+                               if err2 != nil {
+                                       klog.Errorf("Error loading k8s: %v", 
err2)
+                                       return err2
+                               }
+                               klog.Infof("Started Kubernetes configSource 
%s", configSource.Address)
+                       } else {
+                               klog.Infof("Not implemented, ignore: %v", 
configSource.Address)
+                               // TODO: handle k8s:// scheme for remote 
cluster. Use same mechanism as service registry,
+                               // using the cluster name as key to match a 
secret.
+                       }
+               default:
+                       klog.Infof("Ignoring unsupported config source: %v", 
configSource.Address)
+               }
+       }
+       return nil
+}
+
+func (s *Server) initK8SConfigStore(args *SailArgs) error {
+       if s.kubeClient == nil {
+               return nil
+       }
+       // TODO
+       return nil
+}
+
+// getTransportCredentials attempts to create credentials.TransportCredentials 
from ClientTLSSettings in mesh config
+// Implemented only for SIMPLE_TLS mode
+// TODO:
+//
+//     Implement for MUTUAL_TLS/DUBBO_MUTUAL_TLS modes
+func (s *Server) getTransportCredentials(args *SailArgs, tlsSettings 
*v1alpha3.ClientTLSSettings) (credentials.TransportCredentials, error) {
+       // TODO ValidateTLS
+
+       switch tlsSettings.GetMode() {
+       case v1alpha3.ClientTLSSettings_SIMPLE:
+               if len(tlsSettings.GetCredentialName()) > 0 {
+                       rootCert, err := 
s.getRootCertFromSecret(tlsSettings.GetCredentialName(), args.Namespace)
+                       if err != nil {
+                               return nil, err
+                       }
+                       tlsSettings.CaCertificates = string(rootCert.Cert)
+                       tlsSettings.CaCrl = string(rootCert.CRL)
+               }
+               if tlsSettings.GetInsecureSkipVerify().GetValue() || 
len(tlsSettings.GetCaCertificates()) == 0 {
+                       return credentials.NewTLS(&tls.Config{
+                               ServerName:         tlsSettings.GetSni(),
+                               InsecureSkipVerify: 
tlsSettings.GetInsecureSkipVerify().GetValue(), // nolint
+                       }), nil
+               }
+               certPool := x509.NewCertPool()
+               if 
!certPool.AppendCertsFromPEM([]byte(tlsSettings.GetCaCertificates())) {
+                       return nil, fmt.Errorf("failed to add ca certificate 
from configSource.tlsSettings to pool")
+               }
+               return credentials.NewTLS(&tls.Config{
+                       ServerName:         tlsSettings.GetSni(),
+                       InsecureSkipVerify: 
tlsSettings.GetInsecureSkipVerify().GetValue(), // nolint
+                       RootCAs:            certPool,
+                       VerifyPeerCertificate: func(rawCerts [][]byte, 
verifiedChains [][]*x509.Certificate) error {
+                               return s.verifyCert(rawCerts, tlsSettings)
+                       },
+               }), nil
+       default:
+               return insecure.NewCredentials(), nil
+       }
+}
+
+// verifyCert verifies given cert against TLS settings like SANs and CRL.
+func (s *Server) verifyCert(certs [][]byte, tlsSettings 
*v1alpha3.ClientTLSSettings) error {
+       if len(certs) == 0 {
+               return fmt.Errorf("no certificates provided")
+       }
+       cert, err := x509.ParseCertificate(certs[0])
+       if err != nil {
+               return fmt.Errorf("failed to parse certificate: %w", err)
+       }
+
+       if len(tlsSettings.SubjectAltNames) > 0 {
+               sanMatchFound := false
+               for _, san := range cert.DNSNames {
+                       if sanMatchFound {
+                               break
+                       }
+                       for _, name := range tlsSettings.SubjectAltNames {
+                               if san == name {
+                                       sanMatchFound = true
+                                       break
+                               }
+                       }
+               }
+               if !sanMatchFound {
+                       return fmt.Errorf("no matching SAN found")
+               }
+       }
+
+       if len(tlsSettings.CaCrl) > 0 {
+               crlData := []byte(strings.TrimSpace(tlsSettings.CaCrl))
+               block, _ := pem.Decode(crlData)
+               if block != nil {
+                       crlData = block.Bytes
+               }
+               crl, err := x509.ParseRevocationList(crlData)
+               if err != nil {
+                       return fmt.Errorf("failed to parse CRL: %w", err)
+               }
+               for _, revokedCert := range crl.RevokedCertificateEntries {
+                       if cert.SerialNumber.Cmp(revokedCert.SerialNumber) == 0 
{
+                               return fmt.Errorf("certificate is revoked")
+                       }
+               }
+       }
+
+       return nil
+}
+
+// getRootCertFromSecret fetches a map of keys and values from a secret with 
name in namespace
+func (s *Server) getRootCertFromSecret(name, namespace string) 
(*dubboCredentials.CertInfo, error) {
+       secret, err := 
s.kubeClient.Kube().CoreV1().Secrets(namespace).Get(context.Background(), name, 
v1.GetOptions{})
+       if err != nil {
+               return nil, fmt.Errorf("failed to get credential with name %v: 
%v", name, err)
+       }
+       return kube.ExtractRoot(secret.Data)
+}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 00e4a551..f95cc434 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -34,7 +34,7 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        "github.com/apache/dubbo-kubernetes/sail/pkg/server"
-       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/providers"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
        tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
        "github.com/apache/dubbo-kubernetes/sail/pkg/xds"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
@@ -53,21 +53,28 @@ import (
 )
 
 type Server struct {
-       XDSServer           *xds.DiscoveryServer
-       clusterID           cluster.ID
-       environment         *model.Environment
-       server              server.Instance
-       kubeClient          kubelib.Client
-       grpcServer          *grpc.Server
-       grpcAddress         string
-       secureGrpcServer    *grpc.Server
-       secureGrpcAddress   string
-       httpServer          *http.Server // debug, monitoring and readiness 
Server.
-       httpAddr            string
-       httpsServer         *http.Server // webhooks HTTPS Server.
-       httpsAddr           string
-       httpMux             *http.ServeMux
-       httpsMux            *http.ServeMux // webhooks
+       XDSServer   *xds.DiscoveryServer
+       clusterID   cluster.ID
+       environment *model.Environment
+       server      server.Instance
+       kubeClient  kubelib.Client
+
+       grpcServer  *grpc.Server
+       grpcAddress string
+
+       secureGrpcServer  *grpc.Server
+       secureGrpcAddress string
+
+       httpServer  *http.Server // debug, monitoring and readiness Server.
+       httpAddr    string
+       httpsServer *http.Server // webhooks HTTPS Server.
+       httpsAddr   string
+       httpMux     *http.ServeMux
+       httpsMux    *http.ServeMux // webhooks
+
+       ConfigStores     []model.ConfigStoreController
+       configController model.ConfigStoreController
+
        fileWatcher         filewatcher.FileWatcher
        internalStop        chan struct{}
        shutdownDuration    time.Duration
@@ -146,6 +153,10 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server)) 
(*Server, error) {
                return nil, err
        }
 
+       if err := s.initControllers(args); err != nil {
+               return nil, err
+       }
+
        return s, nil
 }
 
@@ -289,6 +300,22 @@ func (s *Server) initServers(args *SailArgs) {
 func (s *Server) initGrpcServer(options *dubbokeepalive.Options) {
 }
 
+// initControllers initializes the controllers.
+func (s *Server) initControllers(args *SailArgs) error {
+       klog.Info("initializing controllers")
+       // TODO initMulticluster
+
+       // TODO initSDSServer
+
+       if err := s.initConfigController(args); err != nil {
+               return fmt.Errorf("error initializing config controller: %v", 
err)
+       }
+       if err := s.initServiceControllers(args); err != nil {
+               return fmt.Errorf("error initializing service controllers: %v", 
err)
+       }
+       return nil
+}
+
 func (s *Server) serveHTTP() error {
        // At this point we are ready - start Http Listener so that it can 
respond to readiness events.
        httpListener, err := net.Listen("tcp", s.httpServer.Addr)
@@ -333,11 +360,17 @@ func (s *Server) maybeCreateCA(caOpts *caOptions) error {
        return nil
 }
 
+// addStartFunc appends a function to be run. These are run synchronously in 
order,
+// so the function should start a go routine if it needs to do anything 
blocking
+func (s *Server) addStartFunc(name string, fn server.Component) {
+       s.server.RunComponent(name, fn)
+}
+
 func getClusterID(args *SailArgs) cluster.ID {
        clusterID := args.RegistryOptions.KubeOptions.ClusterID
        if clusterID == "" {
                if hasKubeRegistry(args.RegistryOptions.Registries) {
-                       clusterID = cluster.ID(providers.Kubernetes)
+                       clusterID = cluster.ID(provider.Kubernetes)
                }
        }
        return clusterID
diff --git a/sail/pkg/bootstrap/servicecontroller.go 
b/sail/pkg/bootstrap/servicecontroller.go
new file mode 100644
index 00000000..44e493a2
--- /dev/null
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -0,0 +1,62 @@
+package bootstrap
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+       "k8s.io/klog/v2"
+)
+
+func (s *Server) ServiceController() *aggregate.Controller {
+       return s.environment.ServiceDiscovery.(*aggregate.Controller)
+}
+
+// initServiceControllers creates and initializes the service controllers
+func (s *Server) initServiceControllers(args *SailArgs) error {
+       serviceControllers := s.ServiceController()
+
+       // TODO service entry controller
+
+       registered := sets.New[provider.ID]()
+       for _, r := range args.RegistryOptions.Registries {
+               serviceRegistry := provider.ID(r)
+               if registered.Contains(serviceRegistry) {
+                       klog.Infof("%s registry specified multiple times.", r)
+                       continue
+               }
+               registered.Insert(serviceRegistry)
+               klog.Infof("Adding %s registry adapter", serviceRegistry)
+               switch serviceRegistry {
+               case provider.Kubernetes:
+                       if err := s.initKubeRegistry(args); err != nil {
+                               return err
+                       }
+               default:
+                       return fmt.Errorf("service registry %s is not 
supported", r)
+               }
+       }
+
+       // Defer running of the service controllers.
+       s.addStartFunc("service controllers", func(stop <-chan struct{}) error {
+               go serviceControllers.Run(stop)
+               return nil
+       })
+
+       return nil
+}
+
+// initKubeRegistry creates all the k8s service controllers under this pilot
+func (s *Server) initKubeRegistry(args *SailArgs) (err error) {
+       args.RegistryOptions.KubeOptions.ClusterID = s.clusterID
+       // TODO reversion
+       args.RegistryOptions.KubeOptions.KrtDebugger = args.KrtDebugger
+       // TODO metrics
+       args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer
+       args.RegistryOptions.KubeOptions.MeshNetworksWatcher = 
s.environment.NetworksWatcher
+       args.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
+       args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace
+       args.RegistryOptions.KubeOptions.MeshServiceController = 
s.ServiceController()
+       // TODO NewMulticluster
+       return
+}
diff --git a/sail/pkg/bootstrap/util.go b/sail/pkg/bootstrap/util.go
index a4519a93..9d4a15e4 100644
--- a/sail/pkg/bootstrap/util.go
+++ b/sail/pkg/bootstrap/util.go
@@ -18,12 +18,12 @@
 package bootstrap
 
 import (
-       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/providers"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
 )
 
 func hasKubeRegistry(registries []string) bool {
        for _, r := range registries {
-               if providers.ID(r) == providers.Kubernetes {
+               if provider.ID(r) == provider.Kubernetes {
                        return true
                }
        }
diff --git a/sail/pkg/config/aggregate/config.go 
b/sail/pkg/config/aggregate/config.go
new file mode 100644
index 00000000..16dfd442
--- /dev/null
+++ b/sail/pkg/config/aggregate/config.go
@@ -0,0 +1,189 @@
+// Package aggregate implements a read-only aggregator for config stores.
+package aggregate
+
+import (
+       "errors"
+
+       "k8s.io/apimachinery/pkg/types"
+
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+)
+
+var errorUnsupported = errors.New("unsupported operation: the config 
aggregator is read-only")
+
+// makeStore creates an aggregate config store from several config stores and
+// unifies their descriptors
+func makeStore(stores []model.ConfigStore, writer model.ConfigStore) 
(model.ConfigStore, error) {
+       union := collection.NewSchemasBuilder()
+       storeTypes := make(map[config.GroupVersionKind][]model.ConfigStore)
+       for _, store := range stores {
+               for _, s := range store.Schemas().All() {
+                       if len(storeTypes[s.GroupVersionKind()]) == 0 {
+                               if err := union.Add(s); err != nil {
+                                       return nil, err
+                               }
+                       }
+                       storeTypes[s.GroupVersionKind()] = 
append(storeTypes[s.GroupVersionKind()], store)
+               }
+       }
+
+       schemas := union.Build()
+       if err := schemas.Validate(); err != nil {
+               return nil, err
+       }
+       result := &store{
+               schemas: schemas,
+               stores:  storeTypes,
+               writer:  writer,
+       }
+
+       return result, nil
+}
+
+// MakeWriteableCache creates an aggregate config store cache from several 
config store caches. An additional
+// `writer` config store is passed, which may or may not be part of `caches`.
+func MakeWriteableCache(caches []model.ConfigStoreController, writer 
model.ConfigStore) (model.ConfigStoreController, error) {
+       stores := make([]model.ConfigStore, 0, len(caches))
+       for _, cache := range caches {
+               stores = append(stores, cache)
+       }
+       store, err := makeStore(stores, writer)
+       if err != nil {
+               return nil, err
+       }
+       return &storeCache{
+               ConfigStore: store,
+               caches:      caches,
+       }, nil
+}
+
+// MakeCache creates an aggregate config store cache from several config store
+// caches.
+func MakeCache(caches []model.ConfigStoreController) 
(model.ConfigStoreController, error) {
+       return MakeWriteableCache(caches, nil)
+}
+
+type store struct {
+       // schemas is the unified
+       schemas collection.Schemas
+
+       // stores is a mapping from config type to a store
+       stores map[config.GroupVersionKind][]model.ConfigStore
+
+       writer model.ConfigStore
+}
+
+func (cr *store) Schemas() collection.Schemas {
+       return cr.schemas
+}
+
+// Get the first config found in the stores.
+func (cr *store) Get(typ config.GroupVersionKind, name, namespace string) 
*config.Config {
+       for _, store := range cr.stores[typ] {
+               config := store.Get(typ, name, namespace)
+               if config != nil {
+                       return config
+               }
+       }
+       return nil
+}
+
+// List all configs in the stores.
+func (cr *store) List(typ config.GroupVersionKind, namespace string) 
[]config.Config {
+       stores := cr.stores[typ]
+       if len(stores) == 0 {
+               return nil
+       }
+
+       var (
+               configs      []config.Config
+               storeConfigs = make([][]config.Config, 0, len(stores))
+               configCnt    int
+       )
+
+       for _, store := range stores {
+               curConfigs := store.List(typ, namespace)
+               storeConfigs = append(storeConfigs, curConfigs)
+               configCnt += len(curConfigs)
+       }
+
+       configs = make([]config.Config, 0, configCnt)
+       // Used to remove duplicated config
+       configMap := sets.NewWithLength[types.NamespacedName](configCnt)
+       for _, curConfigs := range storeConfigs {
+               configs = append(configs, curConfigs...)
+       }
+       configs = slices.FilterInPlace[config.Config](configs, func(cfg 
config.Config) bool {
+               return !configMap.InsertContains(cfg.NamespacedName())
+       })
+
+       return configs
+}
+
+func (cr *store) Delete(typ config.GroupVersionKind, name, namespace string, 
resourceVersion *string) error {
+       if cr.writer == nil {
+               return errorUnsupported
+       }
+       return cr.writer.Delete(typ, name, namespace, resourceVersion)
+}
+
+func (cr *store) Create(c config.Config) (string, error) {
+       if cr.writer == nil {
+               return "", errorUnsupported
+       }
+       return cr.writer.Create(c)
+}
+
+func (cr *store) Update(c config.Config) (string, error) {
+       if cr.writer == nil {
+               return "", errorUnsupported
+       }
+       return cr.writer.Update(c)
+}
+
+func (cr *store) UpdateStatus(c config.Config) (string, error) {
+       if cr.writer == nil {
+               return "", errorUnsupported
+       }
+       return cr.writer.UpdateStatus(c)
+}
+
+func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string, 
error) {
+       if cr.writer == nil {
+               return "", errorUnsupported
+       }
+       return cr.writer.Patch(orig, patchFn)
+}
+
+type storeCache struct {
+       model.ConfigStore
+       caches []model.ConfigStoreController
+}
+
+func (cr *storeCache) HasSynced() bool {
+       for _, cache := range cr.caches {
+               if !cache.HasSynced() {
+                       return false
+               }
+       }
+       return true
+}
+
+func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, 
handler model.EventHandler) {
+       for _, cache := range cr.caches {
+               if _, exists := cache.Schemas().FindByGroupVersionKind(kind); 
exists {
+                       cache.RegisterEventHandler(kind, handler)
+               }
+       }
+}
+
+func (cr *storeCache) Run(stop <-chan struct{}) {
+       for _, cache := range cr.caches {
+               go cache.Run(stop)
+       }
+       <-stop
+}
diff --git a/sail/pkg/config/kube/crd/config.go 
b/sail/pkg/config/kube/crd/config.go
new file mode 100644
index 00000000..7b963013
--- /dev/null
+++ b/sail/pkg/config/kube/crd/config.go
@@ -0,0 +1,13 @@
+package crd
+
+import (
+       "encoding/json"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+type DubboKind struct {
+       metav1.TypeMeta
+       metav1.ObjectMeta `json:"metadata"`
+       Spec              json.RawMessage  `json:"spec"`
+       Status            *json.RawMessage `json:"status,omitempty"`
+}
diff --git a/sail/pkg/config/kube/crd/conversion.go 
b/sail/pkg/config/kube/crd/conversion.go
new file mode 100644
index 00000000..66b12cb7
--- /dev/null
+++ b/sail/pkg/config/kube/crd/conversion.go
@@ -0,0 +1,57 @@
+package crd
+
+import (
+       "bytes"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+       "io"
+       kubeyaml "k8s.io/apimachinery/pkg/util/yaml"
+       "reflect"
+)
+
+type ConversionFunc = func(s resource.Schema, js string) (config.Spec, error)
+
+// TODO - add special cases for type-to-kind and kind-to-type
+// conversions with initial-isms. Consider adding additional type
+// information to the abstract model and/or elevating k8s
+// representation to first-class type to avoid extra conversions.
+
+func parseInputsImpl(inputs string, withValidate bool) ([]config.Config, 
[]DubboKind, error) {
+       var varr []config.Config
+       var others []DubboKind
+       reader := bytes.NewReader([]byte(inputs))
+       empty := DubboKind{}
+
+       // We store configs as a YaML stream; there may be more than one 
decoder.
+       yamlDecoder := kubeyaml.NewYAMLOrJSONDecoder(reader, 512*1024)
+       for {
+               obj := DubboKind{}
+               err := yamlDecoder.Decode(&obj)
+               if err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       return nil, nil, fmt.Errorf("cannot parse proto 
message: %v", err)
+               }
+               if reflect.DeepEqual(obj, empty) {
+                       continue
+               }
+
+               // TODO GatewayAPI
+       }
+
+       return varr, others, nil
+}
+
+// ParseInputs reads multiple documents from `kubectl` output and checks with
+// the schema. It also returns the list of unrecognized kinds as the second
+// response.
+//
+// NOTE: This function only decodes a subset of the complete k8s
+// ObjectMeta as identified by the fields in model.Meta. This
+// would typically only be a problem if a user dumps an configuration
+// object with kubectl and then re-ingests it.
+func ParseInputs(inputs string) ([]config.Config, []DubboKind, error) {
+       return parseInputsImpl(inputs, true)
+}
diff --git a/sail/pkg/config/kube/file/controller.go 
b/sail/pkg/config/kube/file/controller.go
new file mode 100644
index 00000000..1a4ffa2a
--- /dev/null
+++ b/sail/pkg/config/kube/file/controller.go
@@ -0,0 +1,192 @@
+package file
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/crd"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+       krtfiles "github.com/apache/dubbo-kubernetes/pkg/kube/krt/files"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       kubecontroller 
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube/controller"
+)
+
+var errUnsupportedOp = fmt.Errorf("unsupported operation: the file config 
store is a read-only view")
+
+type kindStore struct {
+       collection krt.Collection[config.Config]
+       index      krt.Index[string, config.Config]
+       handlers   []krt.HandlerRegistration
+}
+
+type Controller struct {
+       data    map[config.GroupVersionKind]kindStore
+       schemas collection.Schemas
+       stop    chan struct{}
+}
+
+type ConfigKind struct {
+       *config.Config
+}
+
+func (c ConfigKind) ResourceName() string {
+       if c.Namespace == "" {
+               return c.GroupVersionKind.String() + "/" + c.Name
+       }
+
+       return c.GroupVersionKind.String() + "/" + c.Namespace + c.Name
+}
+
+func (c ConfigKind) Equals(other ConfigKind) bool {
+       return c.Config.Equals(other.Config)
+}
+
+func NewController(
+       fileDir string,
+       domainSuffix string,
+       schemas collection.Schemas,
+       options kubecontroller.Options,
+) (*Controller, error) {
+       stop := make(chan struct{})
+       opts := krt.NewOptionsBuilder(stop, "file-monitor", options.KrtDebugger)
+       watch, err := krtfiles.NewFolderWatch(fileDir, func(b []byte) 
([]*config.Config, error) {
+               return parseInputs(b, domainSuffix)
+       }, stop)
+       if err != nil {
+               return nil, err
+       }
+       mainCollection := krtfiles.NewFileCollection(watch, func(c 
*config.Config) *ConfigKind {
+               return &ConfigKind{c}
+       }, opts.WithName("main")...)
+
+       data := make(map[config.GroupVersionKind]kindStore)
+       for _, s := range schemas.All() {
+               gvk := s.GroupVersionKind()
+               if _, ok := collections.Sail.FindByGroupVersionKind(gvk); ok {
+                       collection := krt.NewCollection(mainCollection, 
func(ctx krt.HandlerContext, c ConfigKind) *config.Config {
+                               if c.GroupVersionKind == gvk {
+                                       return c.Config
+                               }
+
+                               return nil
+                       }, opts.WithName(gvk.Kind)...)
+
+                       data[gvk] = kindStore{
+                               collection: collection,
+                               index:      krt.NewNamespaceIndex(collection),
+                       }
+               }
+       }
+
+       return &Controller{
+               schemas: schemas,
+               stop:    stop,
+               data:    data,
+       }, nil
+}
+
+func (c *Controller) Schemas() collection.Schemas {
+       return c.schemas
+}
+
+func (c *Controller) Get(typ config.GroupVersionKind, name, namespace string) 
*config.Config {
+       if data, ok := c.data[typ]; ok {
+               if namespace == "" {
+                       return data.collection.GetKey(name)
+               }
+
+               return data.collection.GetKey(namespace + "/" + name)
+       }
+
+       return nil
+}
+
+func (c *Controller) List(typ config.GroupVersionKind, namespace string) 
[]config.Config {
+       if data, ok := c.data[typ]; ok {
+               if namespace == metav1.NamespaceAll {
+                       return data.collection.List()
+               }
+
+               return data.index.Lookup(namespace)
+       }
+
+       return nil
+}
+
+func (c *Controller) Create(config config.Config) (revision string, err error) 
{
+       return "", errUnsupportedOp
+}
+
+func (c *Controller) Update(config config.Config) (newRevision string, err 
error) {
+       return "", errUnsupportedOp
+}
+
+func (c *Controller) UpdateStatus(config config.Config) (newRevision string, 
err error) {
+       return "", errUnsupportedOp
+}
+
+func (c *Controller) Patch(orig config.Config, patchFn config.PatchFunc) 
(string, error) {
+       return "", errUnsupportedOp
+}
+
+func (c *Controller) Delete(typ config.GroupVersionKind, name, namespace 
string, _ *string) error {
+       return errUnsupportedOp
+}
+
+func (c *Controller) RegisterEventHandler(typ config.GroupVersionKind, handler 
model.EventHandler) {
+       if data, ok := c.data[typ]; ok {
+               data.handlers = append(
+                       data.handlers,
+                       data.collection.RegisterBatch(func(evs 
[]krt.Event[config.Config]) {
+                               for _, event := range evs {
+                                       switch event.Event {
+                                       case controllers.EventAdd:
+                                               handler(config.Config{}, 
*event.New, model.EventAdd)
+                                       case controllers.EventUpdate:
+                                               handler(*event.Old, *event.New, 
model.EventUpdate)
+                                       case controllers.EventDelete:
+                                               handler(config.Config{}, 
*event.Old, model.EventDelete)
+                                       }
+                               }
+                       }, false),
+               )
+       }
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+       <-stop
+       close(c.stop)
+}
+
+func (c *Controller) HasSynced() bool {
+       for _, data := range c.data {
+               if !data.collection.HasSynced() {
+                       return false
+               }
+
+               for _, handler := range data.handlers {
+                       if !handler.HasSynced() {
+                               return false
+                       }
+               }
+       }
+
+       return true
+}
+
+// parseInputs is identical to crd.ParseInputs, except that it returns an 
array of config pointers.
+func parseInputs(data []byte, domainSuffix string) ([]*config.Config, error) {
+       configs, _, err := crd.ParseInputs(string(data))
+
+       // Convert to an array of pointers.
+       refs := make([]*config.Config, len(configs))
+       for i := range configs {
+               refs[i] = &configs[i]
+               refs[i].Domain = domainSuffix
+       }
+       return refs, err
+}
diff --git a/sail/pkg/config/memory/controller.go 
b/sail/pkg/config/memory/controller.go
new file mode 100644
index 00000000..06c121d8
--- /dev/null
+++ b/sail/pkg/config/memory/controller.go
@@ -0,0 +1,140 @@
+package memory
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+       "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "k8s.io/apimachinery/pkg/types"
+)
+
+// Controller is an implementation of ConfigStoreController.
+type Controller struct {
+       monitor     Monitor
+       configStore model.ConfigStore
+       hasSynced   func() bool
+
+       // If meshConfig.DiscoverySelectors are specified, the namespacesFilter 
tracks the namespaces this controller watches.
+       namespacesFilter func(obj interface{}) bool
+}
+
+// NewController return an implementation of ConfigStoreController
+// This is a client-side monitor that dispatches events as the changes are 
being
+// made on the client.
+func NewController(cs model.ConfigStore) *Controller {
+       out := &Controller{
+               configStore: cs,
+               // TODO monitor
+       }
+       return out
+}
+
+func (c *Controller) RegisterHasSyncedHandler(cb func() bool) {
+       c.hasSynced = cb
+}
+
+func (c *Controller) RegisterEventHandler(kind config.GroupVersionKind, f 
model.EventHandler) {
+       c.monitor.AppendEventHandler(kind, f)
+}
+
+// HasSynced return whether store has synced
+// It can be controlled externally (such as by the data source),
+// otherwise it'll always consider synced.
+func (c *Controller) HasSynced() bool {
+       if c.hasSynced != nil {
+               return c.hasSynced()
+       }
+       return true
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+       c.monitor.Run(stop)
+}
+
+func (c *Controller) Schemas() collection.Schemas {
+       return c.configStore.Schemas()
+}
+
+func (c *Controller) Get(kind config.GroupVersionKind, key, namespace string) 
*config.Config {
+       if c.namespacesFilter != nil && !c.namespacesFilter(namespace) {
+               return nil
+       }
+       return c.configStore.Get(kind, key, namespace)
+}
+
+func (c *Controller) Create(config config.Config) (revision string, err error) 
{
+       if revision, err = c.configStore.Create(config); err == nil {
+               c.monitor.ScheduleProcessEvent(ConfigEvent{
+                       config: config,
+                       event:  model.EventAdd,
+               })
+       }
+       return
+}
+
+func (c *Controller) Update(config config.Config) (newRevision string, err 
error) {
+       oldconfig := c.configStore.Get(config.GroupVersionKind, config.Name, 
config.Namespace)
+       if newRevision, err = c.configStore.Update(config); err == nil {
+               c.monitor.ScheduleProcessEvent(ConfigEvent{
+                       old:    *oldconfig,
+                       config: config,
+                       event:  model.EventUpdate,
+               })
+       }
+       return
+}
+
+func (c *Controller) UpdateStatus(config config.Config) (newRevision string, 
err error) {
+       oldconfig := c.configStore.Get(config.GroupVersionKind, config.Name, 
config.Namespace)
+       if newRevision, err = c.configStore.UpdateStatus(config); err == nil {
+               c.monitor.ScheduleProcessEvent(ConfigEvent{
+                       old:    *oldconfig,
+                       config: config,
+                       event:  model.EventUpdate,
+               })
+       }
+       return
+}
+
+func (c *Controller) Patch(orig config.Config, patchFn config.PatchFunc) 
(newRevision string, err error) {
+       cfg, typ := patchFn(orig.DeepCopy())
+       switch typ {
+       case types.MergePatchType:
+       case types.JSONPatchType:
+       default:
+               return "", fmt.Errorf("unsupported merge type: %s", typ)
+       }
+       if newRevision, err = c.configStore.Patch(cfg, patchFn); err == nil {
+               c.monitor.ScheduleProcessEvent(ConfigEvent{
+                       old:    orig,
+                       config: cfg,
+                       event:  model.EventUpdate,
+               })
+       }
+       return
+}
+
+func (c *Controller) Delete(kind config.GroupVersionKind, key, namespace 
string, resourceVersion *string) error {
+       if config := c.Get(kind, key, namespace); config != nil {
+               if err := c.configStore.Delete(kind, key, namespace, 
resourceVersion); err != nil {
+                       return err
+               }
+               c.monitor.ScheduleProcessEvent(ConfigEvent{
+                       config: *config,
+                       event:  model.EventDelete,
+               })
+               return nil
+       }
+       return fmt.Errorf("delete: config %v/%v/%v does not exist", kind, 
namespace, key)
+}
+
+func (c *Controller) List(kind config.GroupVersionKind, namespace string) 
[]config.Config {
+       configs := c.configStore.List(kind, namespace)
+       if c.namespacesFilter != nil {
+               return slices.Filter(configs, func(config config.Config) bool {
+                       return c.namespacesFilter(config)
+               })
+       }
+       return configs
+}
diff --git a/sail/pkg/config/memory/monitor.go 
b/sail/pkg/config/memory/monitor.go
new file mode 100644
index 00000000..49daad07
--- /dev/null
+++ b/sail/pkg/config/memory/monitor.go
@@ -0,0 +1,22 @@
+package memory
+
+import (
+       config2 "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+)
+
+type Handler func(config2.Config, config2.Config, model.Event)
+
+// Monitor provides methods of manipulating changes in the config store
+type Monitor interface {
+       Run(<-chan struct{})
+       AppendEventHandler(config2.GroupVersionKind, Handler)
+       ScheduleProcessEvent(ConfigEvent)
+}
+
+// ConfigEvent defines the event to be processed
+type ConfigEvent struct {
+       config config2.Config
+       old    config2.Config
+       event  model.Event
+}
diff --git a/sail/pkg/config/memory/store.go b/sail/pkg/config/memory/store.go
new file mode 100644
index 00000000..f36cde5e
--- /dev/null
+++ b/sail/pkg/config/memory/store.go
@@ -0,0 +1,250 @@
+package memory
+
+import (
+       "errors"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "sync"
+       "time"
+)
+
+var (
+       errNotFound      = errors.New("item not found")
+       errAlreadyExists = errors.New("item already exists")
+       // TODO: can we make this compatible with kerror.IsConflict without 
imports the library?
+       errConflict = errors.New("conflicting resource version, try again")
+)
+
+// Make creates an in-memory config store from a config schemas
+// It is with validation
+func Make(schemas collection.Schemas) model.ConfigStore {
+       return newStore(schemas, false)
+}
+
+type store struct {
+       schemas        collection.Schemas
+       data           map[config.GroupVersionKind]map[string]map[string]any
+       skipValidation bool
+       mutex          sync.RWMutex
+}
+
+func newStore(schemas collection.Schemas, skipValidation bool) 
model.ConfigStore {
+       out := store{
+               schemas:        schemas,
+               data:           
make(map[config.GroupVersionKind]map[string]map[string]any),
+               skipValidation: skipValidation,
+       }
+       for _, s := range schemas.All() {
+               out.data[s.GroupVersionKind()] = make(map[string]map[string]any)
+       }
+       return &out
+}
+
+func (cr *store) Schemas() collection.Schemas {
+       return cr.schemas
+}
+func (cr *store) Get(kind config.GroupVersionKind, name, namespace string) 
*config.Config {
+       cr.mutex.RLock()
+       defer cr.mutex.RUnlock()
+       _, ok := cr.data[kind]
+       if !ok {
+               return nil
+       }
+
+       ns, exists := cr.data[kind][namespace]
+       if !exists {
+               return nil
+       }
+
+       out, exists := ns[name]
+       if !exists {
+               return nil
+       }
+       config := out.(config.Config)
+
+       return &config
+}
+
+func (cr *store) List(kind config.GroupVersionKind, namespace string) 
[]config.Config {
+       cr.mutex.RLock()
+       defer cr.mutex.RUnlock()
+       data, exists := cr.data[kind]
+       if !exists {
+               return nil
+       }
+
+       var size int
+       if namespace == "" {
+               for _, ns := range data {
+                       size += len(ns)
+               }
+       } else {
+               size = len(data[namespace])
+       }
+
+       out := make([]config.Config, 0, size)
+       if namespace == "" {
+               for _, ns := range data {
+                       for _, value := range ns {
+                               out = append(out, value.(config.Config))
+                       }
+               }
+       } else {
+               ns, exists := data[namespace]
+               if !exists {
+                       return nil
+               }
+               for _, value := range ns {
+                       out = append(out, value.(config.Config))
+               }
+       }
+       return out
+}
+
+func (cr *store) Delete(kind config.GroupVersionKind, name, namespace string, 
resourceVersion *string) error {
+       cr.mutex.Lock()
+       defer cr.mutex.Unlock()
+       data, ok := cr.data[kind]
+       if !ok {
+               return fmt.Errorf("unknown type %v", kind)
+       }
+       ns, exists := data[namespace]
+       if !exists {
+               return errNotFound
+       }
+
+       _, exists = ns[name]
+       if !exists {
+               return errNotFound
+       }
+
+       delete(ns, name)
+       return nil
+}
+
+func (cr *store) Create(cfg config.Config) (string, error) {
+       cr.mutex.Lock()
+       defer cr.mutex.Unlock()
+       kind := cfg.GroupVersionKind
+       s, ok := cr.schemas.FindByGroupVersionKind(kind)
+       if !ok {
+               return "", fmt.Errorf("unknown type %v", kind)
+       }
+       if !cr.skipValidation {
+               if _, err := s.ValidateConfig(cfg); err != nil {
+                       return "", err
+               }
+       }
+       ns, exists := cr.data[kind][cfg.Namespace]
+       if !exists {
+               ns = map[string]any{}
+               cr.data[kind][cfg.Namespace] = ns
+       }
+
+       _, exists = ns[cfg.Name]
+
+       if !exists {
+               tnow := time.Now()
+               if cfg.ResourceVersion == "" {
+                       cfg.ResourceVersion = tnow.String()
+               }
+               // Set the creation timestamp, if not provided.
+               if cfg.CreationTimestamp.IsZero() {
+                       cfg.CreationTimestamp = tnow
+               }
+
+               ns[cfg.Name] = cfg
+               return cfg.ResourceVersion, nil
+       }
+       return "", errAlreadyExists
+}
+
+func (cr *store) Update(cfg config.Config) (string, error) {
+       cr.mutex.Lock()
+       defer cr.mutex.Unlock()
+       kind := cfg.GroupVersionKind
+       s, ok := cr.schemas.FindByGroupVersionKind(kind)
+       if !ok {
+               return "", fmt.Errorf("unknown type %v", kind)
+       }
+       if !cr.skipValidation {
+               if _, err := s.ValidateConfig(cfg); err != nil {
+                       return "", err
+               }
+       }
+
+       ns, exists := cr.data[kind][cfg.Namespace]
+       if !exists {
+               return "", errNotFound
+       }
+
+       existing, exists := ns[cfg.Name]
+       if !exists {
+               return "", errNotFound
+       }
+       if hasConflict(existing.(config.Config), cfg) {
+               return "", errConflict
+       }
+       // TODO Annotations[ResourceVersion]
+       if cfg.Annotations != nil && cfg.Annotations[""] != "" {
+               cfg.ResourceVersion = cfg.Annotations[""]
+               delete(cfg.Annotations, "")
+       } else {
+               cfg.ResourceVersion = time.Now().String()
+       }
+
+       ns[cfg.Name] = cfg
+       return cfg.ResourceVersion, nil
+}
+
+func (cr *store) UpdateStatus(cfg config.Config) (string, error) {
+       return cr.Update(cfg)
+}
+
+func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string, 
error) {
+       cr.mutex.Lock()
+       defer cr.mutex.Unlock()
+
+       gvk := orig.GroupVersionKind
+       s, ok := cr.schemas.FindByGroupVersionKind(gvk)
+       if !ok {
+               return "", fmt.Errorf("unknown type %v", gvk)
+       }
+
+       cfg, _ := patchFn(orig)
+       if !cr.skipValidation {
+               if _, err := s.ValidateConfig(cfg); err != nil {
+                       return "", err
+               }
+       }
+
+       _, ok = cr.data[gvk]
+       if !ok {
+               return "", errNotFound
+       }
+       ns, exists := cr.data[gvk][orig.Namespace]
+       if !exists {
+               return "", errNotFound
+       }
+
+       rev := time.Now().String()
+       cfg.ResourceVersion = rev
+       ns[cfg.Name] = cfg
+
+       return rev, nil
+}
+
+// hasConflict checks if the two resources have a conflict, which will block 
Update calls
+func hasConflict(existing, replacement config.Config) bool {
+       if replacement.ResourceVersion == "" {
+               // We don't care about resource version, so just always 
overwrite
+               return false
+       }
+       // We set a resource version but its not matched, it is a conflict
+       if replacement.ResourceVersion != existing.ResourceVersion {
+               return true
+       }
+       return false
+}
diff --git a/sail/pkg/credentials/kube/secrets.go 
b/sail/pkg/credentials/kube/secrets.go
new file mode 100644
index 00000000..191d2484
--- /dev/null
+++ b/sail/pkg/credentials/kube/secrets.go
@@ -0,0 +1,87 @@
+package kube
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/credentials"
+       "sort"
+       "strings"
+)
+
+const (
+       // The ID/name for the certificate chain in kubernetes generic secret.
+       GenericScrtCert = "cert"
+       // The ID/name for the private key in kubernetes generic secret.
+       GenericScrtKey = "key"
+       // The ID/name for the CA certificate in kubernetes generic secret.
+       GenericScrtCaCert = "cacert"
+       // The ID/name for the CRL in kubernetes generic secret.
+       GenericScrtCRL = "crl"
+
+       // The ID/name for the certificate chain in kubernetes tls secret.
+       TLSSecretCert = "tls.crt"
+       // The ID/name for the k8sKey in kubernetes tls secret.
+       TLSSecretKey = "tls.key"
+       // The ID/name for the certificate OCSP staple in kubernetes tls secret
+       TLSSecretOcspStaple = "tls.ocsp-staple"
+       // The ID/name for the CA certificate in kubernetes tls secret
+       TLSSecretCaCert = "ca.crt"
+       // The ID/name for the CRL in kubernetes tls secret.
+       TLSSecretCrl = "ca.crl"
+)
+
+func hasKeys(d map[string][]byte, keys ...string) bool {
+       for _, k := range keys {
+               _, f := d[k]
+               if !f {
+                       return false
+               }
+       }
+       return true
+}
+
+func hasValue(d map[string][]byte, keys ...string) bool {
+       for _, k := range keys {
+               v := d[k]
+               if len(v) == 0 {
+                       return false
+               }
+       }
+       return true
+}
+
+func truncatedKeysMessage(data map[string][]byte) string {
+       keys := []string{}
+       for k := range data {
+               keys = append(keys, k)
+       }
+       sort.Strings(keys)
+       if len(keys) < 3 {
+               return strings.Join(keys, ", ")
+       }
+       return fmt.Sprintf("%s, and %d more...", strings.Join(keys[:3], ", "), 
len(keys)-3)
+}
+
+// ExtractRoot extracts the root certificate
+func ExtractRoot(data map[string][]byte) (certInfo *credentials.CertInfo, err 
error) {
+       ret := &credentials.CertInfo{}
+       if hasValue(data, GenericScrtCaCert) {
+               ret.Cert = data[GenericScrtCaCert]
+               ret.CRL = data[GenericScrtCRL]
+               return ret, nil
+       }
+       if hasValue(data, TLSSecretCaCert) {
+               ret.Cert = data[TLSSecretCaCert]
+               ret.CRL = data[TLSSecretCrl]
+               return ret, nil
+       }
+       // No cert found. Try to generate a helpful error message
+       if hasKeys(data, GenericScrtCaCert) {
+               return nil, fmt.Errorf("found key %q, but it was empty", 
GenericScrtCaCert)
+       }
+       if hasKeys(data, TLSSecretCaCert) {
+               return nil, fmt.Errorf("found key %q, but it was empty", 
TLSSecretCaCert)
+       }
+       found := truncatedKeysMessage(data)
+       return nil, fmt.Errorf("found secret, but didn't have expected keys %s 
or %s; found: %s",
+               GenericScrtCaCert, TLSSecretCaCert, found)
+}
diff --git a/sail/pkg/credentials/model.go b/sail/pkg/credentials/model.go
new file mode 100644
index 00000000..f806593d
--- /dev/null
+++ b/sail/pkg/credentials/model.go
@@ -0,0 +1,13 @@
+package credentials
+
+// CertInfo wraps a certificate, key, and oscp staple information.
+type CertInfo struct {
+       // The certificate chain
+       Cert []byte
+       // The private key
+       Key []byte
+       // The oscp staple
+       Staple []byte
+       // Certificate Revocation List information
+       CRL []byte
+}
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
new file mode 100644
index 00000000..78e5b91e
--- /dev/null
+++ b/sail/pkg/model/config.go
@@ -0,0 +1,26 @@
+package model
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+)
+
+type ConfigStore interface {
+       Schemas() collection.Schemas
+       Get(typ config.GroupVersionKind, name, namespace string) *config.Config
+       List(typ config.GroupVersionKind, namespace string) []config.Config
+       Create(config config.Config) (revision string, err error)
+       Update(config config.Config) (newRevision string, err error)
+       UpdateStatus(config config.Config) (newRevision string, err error)
+       Patch(orig config.Config, patchFn config.PatchFunc) (string, error)
+       Delete(typ config.GroupVersionKind, name, namespace string, 
resourceVersion *string) error
+}
+
+type EventHandler = func(config.Config, config.Config, Event)
+
+type ConfigStoreController interface {
+       ConfigStore
+       RegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)
+       Run(stop <-chan struct{})
+       HasSynced() bool
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 626c78d3..49705d61 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -32,7 +32,9 @@ import (
 type Watcher = meshwatcher.WatcherCollection
 
 type Environment struct {
+       ServiceDiscovery
        Watcher
+       ConfigStore
        mutex           sync.RWMutex
        pushContext     *PushContext
        Cache           XdsCache
@@ -107,5 +109,4 @@ func (e *Environment) GetDiscoveryAddress() (host.Name, 
string, error) {
        return host.Name(hostname), port, nil
 }
 
-type Proxy struct {
-}
+type Proxy struct{}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
new file mode 100644
index 00000000..a79c32f7
--- /dev/null
+++ b/sail/pkg/model/controller.go
@@ -0,0 +1,27 @@
+package model
+
+type Controller interface {
+       // Run until a signal is received
+       Run(stop <-chan struct{})
+}
+
+type Event int
+
+const (
+       EventAdd Event = iota
+       EventUpdate
+       EventDelete
+)
+
+func (event Event) String() string {
+       out := "unknown"
+       switch event {
+       case EventAdd:
+               out = "add"
+       case EventUpdate:
+               out = "update"
+       case EventDelete:
+               out = "delete"
+       }
+       return out
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index bf2e3b88..76870e34 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -46,3 +46,6 @@ func (pr *PushRequest) CopyMerge(other *PushRequest) 
*PushRequest {
        merged := &PushRequest{}
        return merged
 }
+
+type XDSUpdater interface {
+}
\ No newline at end of file
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
new file mode 100644
index 00000000..6573e2fb
--- /dev/null
+++ b/sail/pkg/model/service.go
@@ -0,0 +1,4 @@
+package model
+
+type ServiceDiscovery interface {
+}
diff --git a/sail/pkg/networking/util/util.go b/sail/pkg/networking/util/util.go
new file mode 100644
index 00000000..471493b0
--- /dev/null
+++ b/sail/pkg/networking/util/util.go
@@ -0,0 +1,6 @@
+package util
+
+const (
+       // PassthroughFilterChain to catch traffic that doesn't match other 
filter chains.
+       PassthroughFilterChain = "PassthroughFilterChain"
+)
\ No newline at end of file
diff --git a/sail/pkg/server/instance.go b/sail/pkg/server/instance.go
index af7847ba..1ac26f66 100644
--- a/sail/pkg/server/instance.go
+++ b/sail/pkg/server/instance.go
@@ -18,6 +18,7 @@
 package server
 
 import (
+       "k8s.io/klog/v2"
        "time"
 )
 
@@ -30,6 +31,7 @@ type task struct {
 
 type Instance interface {
        Start(stop <-chan struct{}) error
+       RunComponent(name string, t Component)
 }
 
 type instance struct {
@@ -87,3 +89,12 @@ func (i *instance) Start(stop <-chan struct{}) error {
 
        return nil
 }
+
+func (i *instance) RunComponent(name string, t Component) {
+       select {
+       case <-i.done:
+               klog.Infof("attempting to run a new component %q after the 
server was shutdown", name)
+       default:
+               i.components <- task{name, t}
+       }
+}
diff --git a/sail/pkg/serviceregistry/aggregate/controller.go 
b/sail/pkg/serviceregistry/aggregate/controller.go
new file mode 100644
index 00000000..0b220755
--- /dev/null
+++ b/sail/pkg/serviceregistry/aggregate/controller.go
@@ -0,0 +1,55 @@
+package aggregate
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
+       "k8s.io/klog/v2"
+       "sync"
+)
+
+type Controller struct {
+       registries      []*registryEntry
+       storeLock       sync.RWMutex
+       running         bool
+       meshHolder      mesh.Holder
+       configClusterID cluster.ID
+}
+
+type registryEntry struct {
+       serviceregistry.Instance
+       // stop if not nil is the per-registry stop chan. If null, the server 
stop chan should be used to Run the registry.
+       stop <-chan struct{}
+}
+
+type Options struct {
+       MeshHolder      mesh.Holder
+       ConfigClusterID cluster.ID
+}
+
+func NewController(opt Options) *Controller {
+       return &Controller{
+               registries:      make([]*registryEntry, 0),
+               configClusterID: opt.ConfigClusterID,
+               meshHolder:      opt.MeshHolder,
+               running:         false,
+       }
+}
+
+// Run starts all the controllers
+func (c *Controller) Run(stop <-chan struct{}) {
+       c.storeLock.Lock()
+       for _, r := range c.registries {
+               // prefer the per-registry stop channel
+               registryStop := stop
+               if s := r.stop; s != nil {
+                       registryStop = s
+               }
+               go r.Run(registryStop)
+       }
+       c.running = true
+       c.storeLock.Unlock()
+
+       <-stop
+       klog.Info("Registry Aggregator terminated")
+}
diff --git a/sail/pkg/serviceregistry/instance.go 
b/sail/pkg/serviceregistry/instance.go
new file mode 100644
index 00000000..f08a0bbb
--- /dev/null
+++ b/sail/pkg/serviceregistry/instance.go
@@ -0,0 +1,20 @@
+package serviceregistry
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+)
+
+// Instance of a service registry. A single service registry combines the 
capabilities of service discovery
+// and the controller for managing asynchronous events.
+type Instance interface {
+       model.Controller
+       model.ServiceDiscovery
+
+       // Provider backing this service registry (i.e. Kubernetes etc.)
+       Provider() provider.ID
+
+       // Cluster for which the service registry applies. Only needed for 
multicluster systems.
+       Cluster() cluster.ID
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go 
b/sail/pkg/serviceregistry/kube/controller/controller.go
index 4d1a7582..dac2cf31 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -17,12 +17,28 @@
 
 package controller
 
-import "github.com/apache/dubbo-kubernetes/pkg/cluster"
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/cluster"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+)
 
 type Options struct {
        KubernetesAPIQPS   float32
        KubernetesAPIBurst int
        DomainSuffix       string
-       ClusterID          cluster.ID
-       ClusterAliases     map[string]string
+       // XDSUpdater will push changes to the xDS server.
+       XDSUpdater model.XDSUpdater
+       // MeshNetworksWatcher observes changes to the mesh networks config.
+       MeshNetworksWatcher mesh.NetworksWatcher
+       // MeshWatcher observes changes to the mesh config
+       MeshWatcher           meshwatcher.WatcherCollection
+       ClusterID             cluster.ID
+       ClusterAliases        map[string]string
+       SystemNamespace       string
+       MeshServiceController *aggregate.Controller
+       KrtDebugger           *krt.DebugHandler
 }
diff --git a/sail/pkg/serviceregistry/providers/providers.go 
b/sail/pkg/serviceregistry/provider/provider.go
similarity index 98%
rename from sail/pkg/serviceregistry/providers/providers.go
rename to sail/pkg/serviceregistry/provider/provider.go
index f0532b1c..0d0f7b46 100644
--- a/sail/pkg/serviceregistry/providers/providers.go
+++ b/sail/pkg/serviceregistry/provider/provider.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package providers
+package provider
 
 // ID defines underlying platform supporting service registry
 type ID string
diff --git a/sail/pkg/xds/v3/model.go b/sail/pkg/xds/v3/model.go
new file mode 100644
index 00000000..790de20c
--- /dev/null
+++ b/sail/pkg/xds/v3/model.go
@@ -0,0 +1,11 @@
+package v3
+
+import "github.com/apache/dubbo-kubernetes/pkg/model"
+
+const (
+       ClusterType  = model.ClusterType
+       ListenerType = model.ListenerType
+       EndpointType = model.EndpointType
+       RouteType    = model.RouteType
+       DebugType    = model.DebugType
+)
diff --git a/security/pkg/pki/ca/ca.go b/security/pkg/pki/ca/ca.go
index 48b85751..a9eb7a80 100644
--- a/security/pkg/pki/ca/ca.go
+++ b/security/pkg/pki/ca/ca.go
@@ -313,7 +313,7 @@ func loadSelfSignedCaSecret(client corev1.CoreV1Interface, 
namespace string, caC
                ); err != nil {
                        return fmt.Errorf("failed to create CA KeyCertBundle 
(%v)", err)
                }
-               klog.Infof("Using existing public key: %v", string(rootCerts))
+               klog.Infof("Using existing public key: \n%v", string(rootCerts))
        }
        return err
 }

Reply via email to