This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new 570bf720 [discovery] delete xds cache and webhook code (#797)
570bf720 is described below

commit 570bf72010e0446174d3849cb75626e72b87ea08
Author: Jian Zhong <11638...@qq.com>
AuthorDate: Tue Sep 30 21:08:12 2025 +0800

    [discovery] delete xds cache and webhook code (#797)
---
 go.mod                                   |   1 +
 go.sum                                   |   2 +
 pkg/kube/inject/webhook.go               |   4 +
 pkg/util/hash/hash.go                    |  45 ++++++++
 pkg/xds/server.go                        |  27 -----
 sail/pkg/bootstrap/server.go             |  19 +++-
 sail/pkg/config/kube/crdclient/client.go |   3 +
 sail/pkg/features/tuning.go              |  10 +-
 sail/pkg/model/config.go                 |  29 +++++
 sail/pkg/model/context.go                |  16 ---
 sail/pkg/model/push_context.go           | 139 +++++++++++++++++++-----
 sail/pkg/model/typed_xds_cache.go        |  35 ------
 sail/pkg/model/xds_cache.go              |  53 ---------
 sail/pkg/xds/ads.go                      |  38 +++++++
 sail/pkg/xds/discovery.go                | 179 ++++++++++++++++++++++++++++++-
 15 files changed, 435 insertions(+), 165 deletions(-)

diff --git a/go.mod b/go.mod
index c9d60e36..a6b993a4 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
        github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
        github.com/hashicorp/go-multierror v1.1.1
+       github.com/hashicorp/golang-lru/v2 v2.0.5
        github.com/heroku/color v0.0.6
        github.com/moby/term v0.5.2
        github.com/ory/viper v1.7.5
diff --git a/go.sum b/go.sum
index 310ea56b..5fc89569 100644
--- a/go.sum
+++ b/go.sum
@@ -392,6 +392,8 @@ github.com/hashicorp/errwrap v1.1.0 
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
 github.com/hashicorp/errwrap v1.1.0/go.mod 
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-multierror v1.1.1 
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
 github.com/hashicorp/go-multierror v1.1.1/go.mod 
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
+github.com/hashicorp/golang-lru/v2 v2.0.5 
h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
+github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod 
h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod 
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/heroku/color v0.0.6 h1:UTFFMrmMLFcL3OweqP1lAdp8i1y/9oHqkeHjQ/b/Ny0=
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
new file mode 100644
index 00000000..5f8858f9
--- /dev/null
+++ b/pkg/kube/inject/webhook.go
@@ -0,0 +1,4 @@
+package inject
+
+type Webhook struct {
+}
diff --git a/pkg/util/hash/hash.go b/pkg/util/hash/hash.go
new file mode 100644
index 00000000..da7a02bd
--- /dev/null
+++ b/pkg/util/hash/hash.go
@@ -0,0 +1,45 @@
+package hash
+
+import (
+       "encoding/hex"
+
+       "github.com/cespare/xxhash/v2"
+)
+
+type Hash interface {
+       Write(p []byte) (n int)
+       WriteString(s string) (n int)
+       Sum() string
+       Sum64() uint64
+}
+
+type instance struct {
+       hash *xxhash.Digest
+}
+
+var _ Hash = &instance{}
+
+func New() Hash {
+       return &instance{
+               hash: xxhash.New(),
+       }
+}
+
+func (i *instance) Write(p []byte) (n int) {
+       n, _ = i.hash.Write(p)
+       return
+}
+
+func (i *instance) WriteString(s string) (n int) {
+       n, _ = i.hash.WriteString(s)
+       return
+}
+
+func (i *instance) Sum64() uint64 {
+       return i.hash.Sum64()
+}
+
+func (i *instance) Sum() string {
+       sum := i.hash.Sum(nil)
+       return hex.EncodeToString(sum)
+}
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 814630db..3a213c9d 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -95,26 +95,10 @@ func NewConnection(peerAddr string, stream DiscoveryStream) 
Connection {
 
 func Stream(ctx ConnectionContext) error {
        con := ctx.XdsConnection()
-       // Do not call: defer close(con.pushChannel). The push channel will be 
garbage collected
-       // when the connection is no longer used. Closing the channel can cause 
subtle race conditions
-       // with push. According to the spec: "It's only necessary to close a 
channel when it is important
-       // to tell the receiving goroutines that all data have been sent."
-
-       // Block until either a request is received or a push is triggered.
-       // We need 2 go routines because 'read' blocks in Recv().
        go Receive(ctx)
-
-       // Wait for the proxy to be fully initialized before we start serving 
traffic. Because
-       // initialization doesn't have dependencies that will block, there is 
no need to add any timeout
-       // here. Prior to this explicit wait, we were implicitly waiting by 
receive() not sending to
-       // reqChannel and the connection not being enqueued for pushes to 
pushChannel until the
-       // initialization is complete.
        <-con.initialized
 
        for {
-               // Go select{} statements are not ordered; the same channel can 
be chosen many times.
-               // For requests, these are higher priority (client may be 
blocked on startup until these are done)
-               // and often very cheap to handle (simple ACK), so we check it 
first.
                select {
                case req, ok := <-con.reqChan:
                        if ok {
@@ -129,10 +113,6 @@ func Stream(ctx ConnectionContext) error {
                        return nil
                default:
                }
-               // If there wasn't already a request, poll for requests and 
pushes. Note: if we have a huge
-               // amount of incoming requests, we may still send some pushes, 
as we do not `continue` above;
-               // however, requests will be handled ~2x as much as pushes. 
This ensures a wave of requests
-               // cannot completely starve pushes. However, this scenario is 
unlikely.
                select {
                case req, ok := <-con.reqChan:
                        if ok {
@@ -140,7 +120,6 @@ func Stream(ctx ConnectionContext) error {
                                        return err
                                }
                        } else {
-                               // Remote side closed connection or error 
processing the request.
                                return <-con.errorChan
                        }
                case pushEv := <-con.pushChannel:
@@ -159,7 +138,6 @@ func Receive(ctx ConnectionContext) {
        defer func() {
                close(con.errorChan)
                close(con.reqChan)
-               // Close the initialized channel, if its not already closed, to 
prevent blocking the stream.
                select {
                case <-con.initialized:
                default:
@@ -179,9 +157,7 @@ func Receive(ctx ConnectionContext) {
                        klog.Errorf("ADS: %q %s terminated with error: %v", 
con.peerAddr, con.conID, err)
                        return
                }
-               // This should be only set for the first request. The node id 
may not be set - for example malicious clients.
                if firstRequest {
-                       // probe happens before envoy sends first xDS request
                        if req.TypeUrl == model.HealthInfoType {
                                klog.Warningf("ADS: %q %s send health check 
probe before normal xDS request", con.peerAddr, con.conID)
                                continue
@@ -227,9 +203,6 @@ func (conn *Connection) MarkInitialized() {
 func ShouldRespond(w Watcher, id string, request *discovery.DiscoveryRequest) 
(bool, ResourceDelta) {
        stype := model.GetShortType(request.TypeUrl)
 
-       // If there is an error in request that means previous response is 
erroneous.
-       // We do not have to respond in that case. In this case request's 
version info
-       // will be different from the version sent. But it is fragile to rely 
on that.
        if request.ErrorDetail != nil {
                errCode := codes.Code(request.ErrorDetail.Code)
                klog.Warningf("ADS:%s: ACK ERROR %s %s:%s", stype, id, 
errCode.String(), request.ErrorDetail.GetMessage())
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index fed67bdb..1a60f80d 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/h2c"
        dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
        kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/inject"
        "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
        "github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
        sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
@@ -109,6 +110,13 @@ type Server struct {
        dubbodCertBundleWatcher *keycertbundle.Watcher
 
        readinessProbes map[string]readinessProbe
+
+       webhookInfo *webhookInfo
+}
+
+type webhookInfo struct {
+       mu sync.RWMutex
+       wh *inject.Webhook
 }
 
 type readinessProbe func() bool
@@ -215,7 +223,7 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server)) 
(*Server, error) {
 
        // TODO initRegistryEventHandlers?
 
-       // TODO initDiscoveryService?
+       s.initDiscoveryService()
 
        s.startCA(caOpts)
 
@@ -286,6 +294,15 @@ func (s *Server) Start(stop <-chan struct{}) error {
        return nil
 }
 
+func (s *Server) initDiscoveryService() {
+       klog.Infof("starting discovery service")
+       s.addStartFunc("xds server", func(stop <-chan struct{}) error {
+               klog.Infof("Starting ADS server")
+               s.XDSServer.Start(stop)
+               return nil
+       })
+}
+
 func (s *Server) startCA(caOpts *caOptions) {
        if s.CA == nil && s.RA == nil {
                return
diff --git a/sail/pkg/config/kube/crdclient/client.go 
b/sail/pkg/config/kube/crdclient/client.go
new file mode 100644
index 00000000..da7070cb
--- /dev/null
+++ b/sail/pkg/config/kube/crdclient/client.go
@@ -0,0 +1,3 @@
+package crdclient
+
+type Client struct{}
diff --git a/sail/pkg/features/tuning.go b/sail/pkg/features/tuning.go
index a99ba302..1b29b630 100644
--- a/sail/pkg/features/tuning.go
+++ b/sail/pkg/features/tuning.go
@@ -17,7 +17,10 @@
 
 package features
 
-import "github.com/apache/dubbo-kubernetes/pkg/env"
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/env"
+       "time"
+)
 
 var (
        MaxConcurrentStreams = env.Register(
@@ -32,4 +35,9 @@ var (
                4*1024*1024,
                "Sets the max receive buffer size of gRPC stream in bytes.",
        ).Get()
+
+       XDSCacheMaxSize = env.Register("SAIL_XDS_CACHE_SIZE", 60000,
+               "The maximum number of cache entries for the XDS cache.").Get()
+       XDSCacheIndexClearInterval = 
env.Register("SAIL_XDS_CACHE_INDEX_CLEAR_INTERVAL", 5*time.Second,
+               "The interval for xds cache index clearing.").Get()
 )
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
index 6619a5d9..9cd803ae 100644
--- a/sail/pkg/model/config.go
+++ b/sail/pkg/model/config.go
@@ -4,9 +4,14 @@ import (
        "cmp"
        "github.com/apache/dubbo-kubernetes/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+       "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+       "github.com/apache/dubbo-kubernetes/pkg/util/hash"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "sort"
 )
 
+type ConfigHash uint64
+
 const (
        NamespaceAll = ""
 )
@@ -43,3 +48,27 @@ func sortConfigByCreationTime(configs []config.Config) 
[]config.Config {
        })
        return configs
 }
+
+func (key ConfigKey) String() string {
+       return key.Kind.String() + "/" + key.Namespace + "/" + key.Name
+}
+
+func HasConfigsOfKind(configs sets.Set[ConfigKey], kind kind.Kind) bool {
+       for c := range configs {
+               if c.Kind == kind {
+                       return true
+               }
+       }
+       return false
+}
+
+func (key ConfigKey) HashCode() ConfigHash {
+       h := hash.New()
+       h.Write([]byte{byte(key.Kind)})
+       // Add separator / to avoid collision.
+       h.WriteString("/")
+       h.WriteString(key.Namespace)
+       h.WriteString("/")
+       h.WriteString(key.Name)
+       return ConfigHash(h.Sum64())
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index e6a2f4f1..98c0bd75 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -23,7 +23,6 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
        "github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
        "github.com/apache/dubbo-kubernetes/pkg/xds"
-       "github.com/apache/dubbo-kubernetes/sail/pkg/features"
        meshconfig "istio.io/api/mesh/v1alpha1"
        "net"
        "strconv"
@@ -40,30 +39,15 @@ type Environment struct {
        ConfigStore
        mutex                sync.RWMutex
        pushContext          *PushContext
-       Cache                XdsCache
        NetworksWatcher      mesh.NetworksWatcher
        NetworkManager       *NetworkManager
        clusterLocalServices ClusterLocalProvider
        DomainSuffix         string
 }
 
-type XdsCacheImpl struct {
-       cds typedXdsCache[uint64]
-       eds typedXdsCache[uint64]
-       rds typedXdsCache[uint64]
-       sds typedXdsCache[string]
-}
-
 func NewEnvironment() *Environment {
-       var cache XdsCache
-       if features.EnableXDSCaching {
-               cache = NewXdsCache()
-       } else {
-               cache = DisabledCache{}
-       }
        return &Environment{
                pushContext: NewPushContext(),
-               Cache:       cache,
        }
 }
 
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 4e941844..e1e48814 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -27,6 +27,13 @@ import (
        meshconfig "istio.io/api/mesh/v1alpha1"
        "k8s.io/apimachinery/pkg/types"
        "sync"
+       "time"
+)
+
+type TriggerReason string
+
+const (
+       UnknownTrigger TriggerReason = "unknown"
 )
 
 type PushContext struct {
@@ -51,26 +58,9 @@ type serviceAccountKey struct {
 }
 
 type virtualServiceIndex struct {
-       exportedToNamespaceByGateway map[types.NamespacedName][]config.Config
-       // this contains all the virtual services with exportTo "." and current 
namespace. The keys are namespace,gateway.
-       privateByNamespaceAndGateway map[types.NamespacedName][]config.Config
-       // This contains all virtual services whose exportTo is "*", keyed by 
gateway
-       publicByGateway map[string][]config.Config
-       // root vs namespace/name ->delegate vs virtualservice 
gvk/namespace/name
-       delegates map[ConfigKey][]ConfigKey
-
-       // This contains destination hosts of virtual services, keyed by 
gateway's namespace/name,
-       // only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled
-       destinationsByGateway map[string]sets.String
-
-       // Map of VS hostname -> referenced hostnames
-       referencedDestinations map[string]sets.String
 }
 
 type destinationRuleIndex struct {
-       namespaceLocal      map[string]*consolidatedDestRules
-       exportedByNamespace map[string]*consolidatedDestRules
-       rootNamespaceLocal  *consolidatedDestRules
 }
 
 type consolidatedDestRules struct {
@@ -91,17 +81,17 @@ type ConsolidatedDestRule struct {
        from     []types.NamespacedName
 }
 
-type TriggerReason string
-
 type ReasonStats map[TriggerReason]int
 
 type PushRequest struct {
-       Reason          ReasonStats
-       ConfigsUpdated  sets.Set[ConfigKey]
-       Forced          bool
-       Full            bool
-       Push            *PushContext
-       LastPushContext *PushContext
+       Reason           ReasonStats
+       ConfigsUpdated   sets.Set[ConfigKey]
+       Forced           bool
+       Full             bool
+       Push             *PushContext
+       LastPushContext  *PushContext
+       AddressesUpdated sets.Set[string]
+       Start            time.Time
 }
 
 func NewPushContext() *PushContext {
@@ -132,8 +122,7 @@ func (pr *PushRequest) CopyMerge(other *PushRequest) 
*PushRequest {
        return merged
 }
 
-type XDSUpdater interface {
-}
+type XDSUpdater interface{}
 
 func (ps *PushContext) InitContext(env *Environment, oldPushContext 
*PushContext, pushReq *PushRequest) {
        ps.initializeMutex.Lock()
@@ -271,3 +260,99 @@ func (ps *PushContext) updateContext(env *Environment, 
oldPushContext *PushConte
                ps.AuthzPolicies = oldPushContext.AuthzPolicies
        }
 }
+
+func (pr *PushRequest) Merge(other *PushRequest) *PushRequest {
+       if pr == nil {
+               return other
+       }
+       if other == nil {
+               return pr
+       }
+
+       // Keep the first (older) start time
+
+       // Merge the two reasons. Note that we shouldn't deduplicate here, or 
we would under count
+       if len(other.Reason) > 0 {
+               if pr.Reason == nil {
+                       pr.Reason = make(map[TriggerReason]int)
+               }
+               pr.Reason.Merge(other.Reason)
+       }
+
+       // If either is full we need a full push
+       pr.Full = pr.Full || other.Full
+
+       // If either is forced we need a forced push
+       pr.Forced = pr.Forced || other.Forced
+
+       // The other push context is presumed to be later and more up to date
+       if other.Push != nil {
+               pr.Push = other.Push
+       }
+
+       if pr.ConfigsUpdated == nil {
+               pr.ConfigsUpdated = other.ConfigsUpdated
+       } else {
+               pr.ConfigsUpdated.Merge(other.ConfigsUpdated)
+       }
+
+       if pr.AddressesUpdated == nil {
+               pr.AddressesUpdated = other.AddressesUpdated
+       } else {
+               pr.AddressesUpdated.Merge(other.AddressesUpdated)
+       }
+
+       return pr
+}
+
+func NewReasonStats(reasons ...TriggerReason) ReasonStats {
+       ret := make(ReasonStats)
+       for _, reason := range reasons {
+               ret.Add(reason)
+       }
+       return ret
+}
+
+func (r ReasonStats) Add(reason TriggerReason) {
+       r[reason]++
+}
+
+func (r ReasonStats) Merge(other ReasonStats) {
+       for reason, count := range other {
+               r[reason] += count
+       }
+}
+
+func (r ReasonStats) Count() int {
+       var ret int
+       for _, count := range r {
+               ret += count
+       }
+       return ret
+}
+
+func (ps *PushContext) GetAllServices() []*Service {
+       return ps.servicesExportedToNamespace(NamespaceAll)
+}
+
+func (ps *PushContext) servicesExportedToNamespace(ns string) []*Service {
+       var out []*Service
+
+       // First add private services and explicitly exportedTo services
+       if ns == NamespaceAll {
+               out = make([]*Service, 0, 
len(ps.ServiceIndex.privateByNamespace)+len(ps.ServiceIndex.public))
+               for _, privateServices := range 
ps.ServiceIndex.privateByNamespace {
+                       out = append(out, privateServices...)
+               }
+       } else {
+               out = make([]*Service, 0, 
len(ps.ServiceIndex.privateByNamespace[ns])+
+                       
len(ps.ServiceIndex.exportedToNamespace[ns])+len(ps.ServiceIndex.public))
+               out = append(out, ps.ServiceIndex.privateByNamespace[ns]...)
+               out = append(out, ps.ServiceIndex.exportedToNamespace[ns]...)
+       }
+
+       // Second add public services
+       out = append(out, ps.ServiceIndex.public...)
+
+       return out
+}
diff --git a/sail/pkg/model/typed_xds_cache.go 
b/sail/pkg/model/typed_xds_cache.go
deleted file mode 100644
index 90ee70ca..00000000
--- a/sail/pkg/model/typed_xds_cache.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-type typedXdsCache[K comparable] interface {
-}
-
-type lruCache[K comparable] struct {
-}
-
-var _ typedXdsCache[uint64] = &lruCache[uint64]{}
-
-func newTypedXdsCache[K comparable]() typedXdsCache[K] {
-       cache := &lruCache[K]{}
-       return cache
-}
-
-type disabledCache[K comparable] struct{}
-
-var _ typedXdsCache[uint64] = &disabledCache[uint64]{}
diff --git a/sail/pkg/model/xds_cache.go b/sail/pkg/model/xds_cache.go
deleted file mode 100644
index 10e53ef6..00000000
--- a/sail/pkg/model/xds_cache.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
-       "github.com/apache/dubbo-kubernetes/sail/pkg/features"
-)
-
-type XdsCache interface {
-       Run(stop <-chan struct{})
-}
-
-type DisabledCache struct{}
-
-func NewXdsCache() XdsCache {
-       cache := XdsCacheImpl{
-               eds: newTypedXdsCache[uint64](),
-       }
-       if features.EnableCDSCaching {
-               cache.cds = newTypedXdsCache[uint64]()
-       } else {
-               cache.cds = disabledCache[uint64]{}
-       }
-       if features.EnableRDSCaching {
-               cache.rds = newTypedXdsCache[uint64]()
-       } else {
-               cache.rds = disabledCache[uint64]{}
-       }
-
-       cache.sds = newTypedXdsCache[string]()
-
-       return cache
-}
-
-func (x XdsCacheImpl) Run(stop <-chan struct{}) {}
-
-func (d DisabledCache) Run(stop <-chan struct{}) {
-}
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 5591c2e6..79d5fa9d 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -19,10 +19,13 @@ package xds
 
 import (
        "context"
+       "github.com/apache/dubbo-kubernetes/pkg/maps"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "github.com/apache/dubbo-kubernetes/pkg/xds"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
        core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+       "k8s.io/klog/v2"
        "time"
 )
 
@@ -63,6 +66,41 @@ func (s *DiscoveryServer) StreamAggregatedResources(stream 
DiscoveryStream) erro
        return s.Stream(stream)
 }
 
+func (s *DiscoveryServer) StartPush(req *model.PushRequest) {
+       req.Start = time.Now()
+       for _, p := range s.AllClients() {
+               s.pushQueue.Enqueue(p, req)
+       }
+}
+
+func (s *DiscoveryServer) AllClients() []*Connection {
+       s.adsClientsMutex.RLock()
+       defer s.adsClientsMutex.RUnlock()
+       return maps.Values(s.adsClients)
+}
+
+func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) {
+       if !req.Full {
+               klog.Infof("XDS: Incremental Pushing ConnectedEndpoints:%d", 
s.adsClientCount())
+       } else {
+               totalService := len(req.Push.GetAllServices())
+               klog.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d", 
totalService, s.adsClientCount())
+
+               // Make sure the ConfigsUpdated map exists
+               if req.ConfigsUpdated == nil {
+                       req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
+               }
+       }
+
+       s.StartPush(req)
+}
+
+func (s *DiscoveryServer) adsClientCount() int {
+       s.adsClientsMutex.RLock()
+       defer s.adsClientsMutex.RUnlock()
+       return len(s.adsClients)
+}
+
 func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, 
error) {
        return nil, nil
 }
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 57d9d4ba..7b00ce4f 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -18,6 +18,7 @@
 package xds
 
 import (
+       "fmt"
        "github.com/apache/dubbo-kubernetes/pkg/cluster"
        "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
        "github.com/apache/dubbo-kubernetes/sail/pkg/model"
@@ -26,6 +27,8 @@ import (
        "golang.org/x/time/rate"
        "google.golang.org/grpc"
        "k8s.io/klog/v2"
+       "strconv"
+       "sync"
        "time"
 )
 
@@ -40,7 +43,6 @@ type DiscoveryServer struct {
        serverReady         atomic.Bool
        DiscoveryStartTime  time.Time
        ClusterAliases      map[cluster.ID]cluster.ID
-       Cache               model.XdsCache
        pushQueue           *PushQueue
        krtDebugger         *krt.DebugHandler
        InboundUpdates      *atomic.Int64
@@ -50,12 +52,13 @@ type DiscoveryServer struct {
        pushChannel         chan *model.PushRequest
        DebounceOptions     DebounceOptions
        concurrentPushLimit chan struct{}
+       adsClientsMutex     sync.RWMutex
+       adsClients          map[string]*Connection
 }
 
 func NewDiscoveryServer(env *model.Environment, clusterAliases 
map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
        out := &DiscoveryServer{
                Env:              env,
-               Cache:            env.Cache,
                krtDebugger:      debugger,
                InboundUpdates:   atomic.NewInt64(0),
                CommittedUpdates: atomic.NewInt64(0),
@@ -75,7 +78,6 @@ func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
 func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
        go s.handleUpdates(stopCh)
        go s.sendPushes(stopCh)
-       go s.Cache.Run(stopCh)
 }
 
 func (s *DiscoveryServer) CachesSynced() {
@@ -87,7 +89,13 @@ func (s *DiscoveryServer) Shutdown() {
        s.pushQueue.ShutDown()
 }
 
-func (s *DiscoveryServer) Push(req *model.PushRequest) {}
+func (s *DiscoveryServer) Push(req *model.PushRequest) {
+       if !req.Full {
+               req.Push = s.globalPushContext()
+               s.AdsPushAll(req)
+               return
+       }
+}
 
 func (s *DiscoveryServer) globalPushContext() *model.PushContext {
        return s.Env.PushContext()
@@ -102,6 +110,167 @@ func (s *DiscoveryServer) sendPushes(stopCh <-chan 
struct{}) {
 }
 
 func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts 
DebounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) 
{
+       var timeChan <-chan time.Time
+       var startDebounce time.Time
+       var lastConfigUpdateTime time.Time
+
+       pushCounter := 0
+       debouncedEvents := 0
+
+       var req *model.PushRequest
+
+       free := true
+       freeCh := make(chan struct{}, 1)
+
+       push := func(req *model.PushRequest, debouncedEvents int, startDebounce 
time.Time) {
+               pushFn(req)
+               updateSent.Add(int64(debouncedEvents))
+               freeCh <- struct{}{}
+       }
+
+       pushWorker := func() {
+               eventDelay := time.Since(startDebounce)
+               quietTime := time.Since(lastConfigUpdateTime)
+               // it has been too long or quiet enough
+               if eventDelay >= opts.debounceMax || quietTime >= 
opts.DebounceAfter {
+                       if req != nil {
+                               pushCounter++
+                               if req.ConfigsUpdated == nil {
+                                       klog.Infof("Push debounce stable[%d] %d 
for reason %s: %v since last change, %v since last push, full=%v",
+                                               pushCounter, debouncedEvents, 
reasonsUpdated(req),
+                                               quietTime, eventDelay, req.Full)
+                               } else {
+                                       klog.Infof("Push debounce stable[%d] %d 
for config %s: %v since last change, %v since last push, full=%v",
+                                               pushCounter, debouncedEvents, 
configsUpdated(req),
+                                               quietTime, eventDelay, req.Full)
+                               }
+                               free = false
+                               go push(req, debouncedEvents, startDebounce)
+                               req = nil
+                               debouncedEvents = 0
+                       }
+               } else {
+                       timeChan = time.After(opts.DebounceAfter - quietTime)
+               }
+       }
+
+       for {
+               select {
+               case <-freeCh:
+                       free = true
+                       pushWorker()
+               case r := <-ch:
+                       // If reason is not set, record it as an unknown reason
+                       if len(r.Reason) == 0 {
+                               r.Reason = 
model.NewReasonStats(model.UnknownTrigger)
+                       }
+                       if !opts.enableEDSDebounce && !r.Full {
+                               // trigger push now, just for EDS
+                               go func(req *model.PushRequest) {
+                                       pushFn(req)
+                                       updateSent.Inc()
+                               }(r)
+                               continue
+                       }
+
+                       lastConfigUpdateTime = time.Now()
+                       if debouncedEvents == 0 {
+                               timeChan = time.After(opts.DebounceAfter)
+                               startDebounce = lastConfigUpdateTime
+                       }
+                       debouncedEvents++
+
+                       req = req.Merge(r)
+               case <-timeChan:
+                       if free {
+                               pushWorker()
+                       }
+               case <-stopCh:
+                       return
+               }
+       }
 }
 
-func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue 
*PushQueue) {}
+func reasonsUpdated(req *model.PushRequest) string {
+       var (
+               reason0, reason1            model.TriggerReason
+               reason0Cnt, reason1Cnt, idx int
+       )
+       for r, cnt := range req.Reason {
+               if idx == 0 {
+                       reason0, reason0Cnt = r, cnt
+               } else if idx == 1 {
+                       reason1, reason1Cnt = r, cnt
+               } else {
+                       break
+               }
+               idx++
+       }
+
+       switch len(req.Reason) {
+       case 0:
+               return "unknown"
+       case 1:
+               return fmt.Sprintf("%s:%d", reason0, reason0Cnt)
+       case 2:
+               return fmt.Sprintf("%s:%d and %s:%d", reason0, reason0Cnt, 
reason1, reason1Cnt)
+       default:
+               return fmt.Sprintf("%s:%d and %d(%d) more reasons", reason0, 
reason0Cnt, len(req.Reason)-1,
+                       req.Reason.Count()-reason0Cnt)
+       }
+}
+
+func configsUpdated(req *model.PushRequest) string {
+       configs := ""
+       for key := range req.ConfigsUpdated {
+               configs += key.String()
+               break
+       }
+       if len(req.ConfigsUpdated) > 1 {
+               more := " and " + strconv.Itoa(len(req.ConfigsUpdated)-1) + " 
more configs"
+               configs += more
+       }
+       return configs
+}
+
+func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue 
*PushQueue) {
+       for {
+               select {
+               case <-stopCh:
+                       return
+               default:
+                       semaphore <- struct{}{}
+
+                       // Get the next proxy to push. This will block if there 
are no updates required.
+                       client, push, shuttingdown := queue.Dequeue()
+                       if shuttingdown {
+                               return
+                       }
+                       doneFunc := func() {
+                               queue.MarkDone(client)
+                               <-semaphore
+                       }
+
+                       var closed <-chan struct{}
+                       if client.deltaStream != nil {
+                               closed = client.deltaStream.Context().Done()
+                       } else {
+                               closed = client.StreamDone()
+                       }
+                       go func() {
+                               pushEv := &Event{
+                                       pushRequest: push,
+                                       done:        doneFunc,
+                               }
+
+                               select {
+                               case client.PushCh() <- pushEv:
+                                       return
+                               case <-closed: // grpc stream was closed
+                                       doneFunc()
+                                       klog.Infof("Client closed connection 
%v", client.ID())
+                               }
+                       }()
+               }
+       }
+}

Reply via email to