This is an automated email from the ASF dual-hosted git repository. zhongxjian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push: new 570bf720 [discovery] delete xds cache and webhook code (#797) 570bf720 is described below commit 570bf72010e0446174d3849cb75626e72b87ea08 Author: Jian Zhong <11638...@qq.com> AuthorDate: Tue Sep 30 21:08:12 2025 +0800 [discovery] delete xds cache and webhook code (#797) --- go.mod | 1 + go.sum | 2 + pkg/kube/inject/webhook.go | 4 + pkg/util/hash/hash.go | 45 ++++++++ pkg/xds/server.go | 27 ----- sail/pkg/bootstrap/server.go | 19 +++- sail/pkg/config/kube/crdclient/client.go | 3 + sail/pkg/features/tuning.go | 10 +- sail/pkg/model/config.go | 29 +++++ sail/pkg/model/context.go | 16 --- sail/pkg/model/push_context.go | 139 +++++++++++++++++++----- sail/pkg/model/typed_xds_cache.go | 35 ------ sail/pkg/model/xds_cache.go | 53 --------- sail/pkg/xds/ads.go | 38 +++++++ sail/pkg/xds/discovery.go | 179 ++++++++++++++++++++++++++++++- 15 files changed, 435 insertions(+), 165 deletions(-) diff --git a/go.mod b/go.mod index c9d60e36..a6b993a4 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/go-multierror v1.1.1 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/heroku/color v0.0.6 github.com/moby/term v0.5.2 github.com/ory/viper v1.7.5 diff --git a/go.sum b/go.sum index 310ea56b..5fc89569 100644 --- a/go.sum +++ b/go.sum @@ -392,6 +392,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/heroku/color v0.0.6 h1:UTFFMrmMLFcL3OweqP1lAdp8i1y/9oHqkeHjQ/b/Ny0= diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go new file mode 100644 index 00000000..5f8858f9 --- /dev/null +++ b/pkg/kube/inject/webhook.go @@ -0,0 +1,4 @@ +package inject + +type Webhook struct { +} diff --git a/pkg/util/hash/hash.go b/pkg/util/hash/hash.go new file mode 100644 index 00000000..da7a02bd --- /dev/null +++ b/pkg/util/hash/hash.go @@ -0,0 +1,45 @@ +package hash + +import ( + "encoding/hex" + + "github.com/cespare/xxhash/v2" +) + +type Hash interface { + Write(p []byte) (n int) + WriteString(s string) (n int) + Sum() string + Sum64() uint64 +} + +type instance struct { + hash *xxhash.Digest +} + +var _ Hash = &instance{} + +func New() Hash { + return &instance{ + hash: xxhash.New(), + } +} + +func (i *instance) Write(p []byte) (n int) { + n, _ = i.hash.Write(p) + return +} + +func (i *instance) WriteString(s string) (n int) { + n, _ = i.hash.WriteString(s) + return +} + +func (i *instance) Sum64() uint64 { + return i.hash.Sum64() +} + +func (i *instance) Sum() string { + sum := i.hash.Sum(nil) + return hex.EncodeToString(sum) +} diff --git a/pkg/xds/server.go b/pkg/xds/server.go index 814630db..3a213c9d 100644 --- a/pkg/xds/server.go +++ b/pkg/xds/server.go @@ -95,26 +95,10 @@ func NewConnection(peerAddr string, stream DiscoveryStream) Connection { func Stream(ctx ConnectionContext) error { con := ctx.XdsConnection() - // Do not call: defer close(con.pushChannel). The push channel will be garbage collected - // when the connection is no longer used. Closing the channel can cause subtle race conditions - // with push. According to the spec: "It's only necessary to close a channel when it is important - // to tell the receiving goroutines that all data have been sent." - - // Block until either a request is received or a push is triggered. - // We need 2 go routines because 'read' blocks in Recv(). go Receive(ctx) - - // Wait for the proxy to be fully initialized before we start serving traffic. Because - // initialization doesn't have dependencies that will block, there is no need to add any timeout - // here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to - // reqChannel and the connection not being enqueued for pushes to pushChannel until the - // initialization is complete. <-con.initialized for { - // Go select{} statements are not ordered; the same channel can be chosen many times. - // For requests, these are higher priority (client may be blocked on startup until these are done) - // and often very cheap to handle (simple ACK), so we check it first. select { case req, ok := <-con.reqChan: if ok { @@ -129,10 +113,6 @@ func Stream(ctx ConnectionContext) error { return nil default: } - // If there wasn't already a request, poll for requests and pushes. Note: if we have a huge - // amount of incoming requests, we may still send some pushes, as we do not `continue` above; - // however, requests will be handled ~2x as much as pushes. This ensures a wave of requests - // cannot completely starve pushes. However, this scenario is unlikely. select { case req, ok := <-con.reqChan: if ok { @@ -140,7 +120,6 @@ func Stream(ctx ConnectionContext) error { return err } } else { - // Remote side closed connection or error processing the request. return <-con.errorChan } case pushEv := <-con.pushChannel: @@ -159,7 +138,6 @@ func Receive(ctx ConnectionContext) { defer func() { close(con.errorChan) close(con.reqChan) - // Close the initialized channel, if its not already closed, to prevent blocking the stream. select { case <-con.initialized: default: @@ -179,9 +157,7 @@ func Receive(ctx ConnectionContext) { klog.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err) return } - // This should be only set for the first request. The node id may not be set - for example malicious clients. if firstRequest { - // probe happens before envoy sends first xDS request if req.TypeUrl == model.HealthInfoType { klog.Warningf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID) continue @@ -227,9 +203,6 @@ func (conn *Connection) MarkInitialized() { func ShouldRespond(w Watcher, id string, request *discovery.DiscoveryRequest) (bool, ResourceDelta) { stype := model.GetShortType(request.TypeUrl) - // If there is an error in request that means previous response is erroneous. - // We do not have to respond in that case. In this case request's version info - // will be different from the version sent. But it is fragile to rely on that. if request.ErrorDetail != nil { errCode := codes.Code(request.ErrorDetail.Code) klog.Warningf("ADS:%s: ACK ERROR %s %s:%s", stype, id, errCode.String(), request.ErrorDetail.GetMessage()) diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go index fed67bdb..1a60f80d 100644 --- a/sail/pkg/bootstrap/server.go +++ b/sail/pkg/bootstrap/server.go @@ -29,6 +29,7 @@ import ( "github.com/apache/dubbo-kubernetes/pkg/h2c" dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive" kubelib "github.com/apache/dubbo-kubernetes/pkg/kube" + "github.com/apache/dubbo-kubernetes/pkg/kube/inject" "github.com/apache/dubbo-kubernetes/pkg/kube/kclient" "github.com/apache/dubbo-kubernetes/pkg/kube/namespace" sec_model "github.com/apache/dubbo-kubernetes/pkg/model" @@ -109,6 +110,13 @@ type Server struct { dubbodCertBundleWatcher *keycertbundle.Watcher readinessProbes map[string]readinessProbe + + webhookInfo *webhookInfo +} + +type webhookInfo struct { + mu sync.RWMutex + wh *inject.Webhook } type readinessProbe func() bool @@ -215,7 +223,7 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server)) (*Server, error) { // TODO initRegistryEventHandlers? - // TODO initDiscoveryService? + s.initDiscoveryService() s.startCA(caOpts) @@ -286,6 +294,15 @@ func (s *Server) Start(stop <-chan struct{}) error { return nil } +func (s *Server) initDiscoveryService() { + klog.Infof("starting discovery service") + s.addStartFunc("xds server", func(stop <-chan struct{}) error { + klog.Infof("Starting ADS server") + s.XDSServer.Start(stop) + return nil + }) +} + func (s *Server) startCA(caOpts *caOptions) { if s.CA == nil && s.RA == nil { return diff --git a/sail/pkg/config/kube/crdclient/client.go b/sail/pkg/config/kube/crdclient/client.go new file mode 100644 index 00000000..da7070cb --- /dev/null +++ b/sail/pkg/config/kube/crdclient/client.go @@ -0,0 +1,3 @@ +package crdclient + +type Client struct{} diff --git a/sail/pkg/features/tuning.go b/sail/pkg/features/tuning.go index a99ba302..1b29b630 100644 --- a/sail/pkg/features/tuning.go +++ b/sail/pkg/features/tuning.go @@ -17,7 +17,10 @@ package features -import "github.com/apache/dubbo-kubernetes/pkg/env" +import ( + "github.com/apache/dubbo-kubernetes/pkg/env" + "time" +) var ( MaxConcurrentStreams = env.Register( @@ -32,4 +35,9 @@ var ( 4*1024*1024, "Sets the max receive buffer size of gRPC stream in bytes.", ).Get() + + XDSCacheMaxSize = env.Register("SAIL_XDS_CACHE_SIZE", 60000, + "The maximum number of cache entries for the XDS cache.").Get() + XDSCacheIndexClearInterval = env.Register("SAIL_XDS_CACHE_INDEX_CLEAR_INTERVAL", 5*time.Second, + "The interval for xds cache index clearing.").Get() ) diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go index 6619a5d9..9cd803ae 100644 --- a/sail/pkg/model/config.go +++ b/sail/pkg/model/config.go @@ -4,9 +4,14 @@ import ( "cmp" "github.com/apache/dubbo-kubernetes/pkg/config" "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection" + "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind" + "github.com/apache/dubbo-kubernetes/pkg/util/hash" + "github.com/apache/dubbo-kubernetes/pkg/util/sets" "sort" ) +type ConfigHash uint64 + const ( NamespaceAll = "" ) @@ -43,3 +48,27 @@ func sortConfigByCreationTime(configs []config.Config) []config.Config { }) return configs } + +func (key ConfigKey) String() string { + return key.Kind.String() + "/" + key.Namespace + "/" + key.Name +} + +func HasConfigsOfKind(configs sets.Set[ConfigKey], kind kind.Kind) bool { + for c := range configs { + if c.Kind == kind { + return true + } + } + return false +} + +func (key ConfigKey) HashCode() ConfigHash { + h := hash.New() + h.Write([]byte{byte(key.Kind)}) + // Add separator / to avoid collision. + h.WriteString("/") + h.WriteString(key.Namespace) + h.WriteString("/") + h.WriteString(key.Name) + return ConfigHash(h.Sum64()) +} diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go index e6a2f4f1..98c0bd75 100644 --- a/sail/pkg/model/context.go +++ b/sail/pkg/model/context.go @@ -23,7 +23,6 @@ import ( "github.com/apache/dubbo-kubernetes/pkg/config/mesh" "github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher" "github.com/apache/dubbo-kubernetes/pkg/xds" - "github.com/apache/dubbo-kubernetes/sail/pkg/features" meshconfig "istio.io/api/mesh/v1alpha1" "net" "strconv" @@ -40,30 +39,15 @@ type Environment struct { ConfigStore mutex sync.RWMutex pushContext *PushContext - Cache XdsCache NetworksWatcher mesh.NetworksWatcher NetworkManager *NetworkManager clusterLocalServices ClusterLocalProvider DomainSuffix string } -type XdsCacheImpl struct { - cds typedXdsCache[uint64] - eds typedXdsCache[uint64] - rds typedXdsCache[uint64] - sds typedXdsCache[string] -} - func NewEnvironment() *Environment { - var cache XdsCache - if features.EnableXDSCaching { - cache = NewXdsCache() - } else { - cache = DisabledCache{} - } return &Environment{ pushContext: NewPushContext(), - Cache: cache, } } diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go index 4e941844..e1e48814 100644 --- a/sail/pkg/model/push_context.go +++ b/sail/pkg/model/push_context.go @@ -27,6 +27,13 @@ import ( meshconfig "istio.io/api/mesh/v1alpha1" "k8s.io/apimachinery/pkg/types" "sync" + "time" +) + +type TriggerReason string + +const ( + UnknownTrigger TriggerReason = "unknown" ) type PushContext struct { @@ -51,26 +58,9 @@ type serviceAccountKey struct { } type virtualServiceIndex struct { - exportedToNamespaceByGateway map[types.NamespacedName][]config.Config - // this contains all the virtual services with exportTo "." and current namespace. The keys are namespace,gateway. - privateByNamespaceAndGateway map[types.NamespacedName][]config.Config - // This contains all virtual services whose exportTo is "*", keyed by gateway - publicByGateway map[string][]config.Config - // root vs namespace/name ->delegate vs virtualservice gvk/namespace/name - delegates map[ConfigKey][]ConfigKey - - // This contains destination hosts of virtual services, keyed by gateway's namespace/name, - // only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled - destinationsByGateway map[string]sets.String - - // Map of VS hostname -> referenced hostnames - referencedDestinations map[string]sets.String } type destinationRuleIndex struct { - namespaceLocal map[string]*consolidatedDestRules - exportedByNamespace map[string]*consolidatedDestRules - rootNamespaceLocal *consolidatedDestRules } type consolidatedDestRules struct { @@ -91,17 +81,17 @@ type ConsolidatedDestRule struct { from []types.NamespacedName } -type TriggerReason string - type ReasonStats map[TriggerReason]int type PushRequest struct { - Reason ReasonStats - ConfigsUpdated sets.Set[ConfigKey] - Forced bool - Full bool - Push *PushContext - LastPushContext *PushContext + Reason ReasonStats + ConfigsUpdated sets.Set[ConfigKey] + Forced bool + Full bool + Push *PushContext + LastPushContext *PushContext + AddressesUpdated sets.Set[string] + Start time.Time } func NewPushContext() *PushContext { @@ -132,8 +122,7 @@ func (pr *PushRequest) CopyMerge(other *PushRequest) *PushRequest { return merged } -type XDSUpdater interface { -} +type XDSUpdater interface{} func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) { ps.initializeMutex.Lock() @@ -271,3 +260,99 @@ func (ps *PushContext) updateContext(env *Environment, oldPushContext *PushConte ps.AuthzPolicies = oldPushContext.AuthzPolicies } } + +func (pr *PushRequest) Merge(other *PushRequest) *PushRequest { + if pr == nil { + return other + } + if other == nil { + return pr + } + + // Keep the first (older) start time + + // Merge the two reasons. Note that we shouldn't deduplicate here, or we would under count + if len(other.Reason) > 0 { + if pr.Reason == nil { + pr.Reason = make(map[TriggerReason]int) + } + pr.Reason.Merge(other.Reason) + } + + // If either is full we need a full push + pr.Full = pr.Full || other.Full + + // If either is forced we need a forced push + pr.Forced = pr.Forced || other.Forced + + // The other push context is presumed to be later and more up to date + if other.Push != nil { + pr.Push = other.Push + } + + if pr.ConfigsUpdated == nil { + pr.ConfigsUpdated = other.ConfigsUpdated + } else { + pr.ConfigsUpdated.Merge(other.ConfigsUpdated) + } + + if pr.AddressesUpdated == nil { + pr.AddressesUpdated = other.AddressesUpdated + } else { + pr.AddressesUpdated.Merge(other.AddressesUpdated) + } + + return pr +} + +func NewReasonStats(reasons ...TriggerReason) ReasonStats { + ret := make(ReasonStats) + for _, reason := range reasons { + ret.Add(reason) + } + return ret +} + +func (r ReasonStats) Add(reason TriggerReason) { + r[reason]++ +} + +func (r ReasonStats) Merge(other ReasonStats) { + for reason, count := range other { + r[reason] += count + } +} + +func (r ReasonStats) Count() int { + var ret int + for _, count := range r { + ret += count + } + return ret +} + +func (ps *PushContext) GetAllServices() []*Service { + return ps.servicesExportedToNamespace(NamespaceAll) +} + +func (ps *PushContext) servicesExportedToNamespace(ns string) []*Service { + var out []*Service + + // First add private services and explicitly exportedTo services + if ns == NamespaceAll { + out = make([]*Service, 0, len(ps.ServiceIndex.privateByNamespace)+len(ps.ServiceIndex.public)) + for _, privateServices := range ps.ServiceIndex.privateByNamespace { + out = append(out, privateServices...) + } + } else { + out = make([]*Service, 0, len(ps.ServiceIndex.privateByNamespace[ns])+ + len(ps.ServiceIndex.exportedToNamespace[ns])+len(ps.ServiceIndex.public)) + out = append(out, ps.ServiceIndex.privateByNamespace[ns]...) + out = append(out, ps.ServiceIndex.exportedToNamespace[ns]...) + } + + // Second add public services + out = append(out, ps.ServiceIndex.public...) + + return out +} diff --git a/sail/pkg/model/typed_xds_cache.go b/sail/pkg/model/typed_xds_cache.go deleted file mode 100644 index 90ee70ca..00000000 --- a/sail/pkg/model/typed_xds_cache.go +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package model - -type typedXdsCache[K comparable] interface { -} - -type lruCache[K comparable] struct { -} - -var _ typedXdsCache[uint64] = &lruCache[uint64]{} - -func newTypedXdsCache[K comparable]() typedXdsCache[K] { - cache := &lruCache[K]{} - return cache -} - -type disabledCache[K comparable] struct{} - -var _ typedXdsCache[uint64] = &disabledCache[uint64]{} diff --git a/sail/pkg/model/xds_cache.go b/sail/pkg/model/xds_cache.go deleted file mode 100644 index 10e53ef6..00000000 --- a/sail/pkg/model/xds_cache.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package model - -import ( - "github.com/apache/dubbo-kubernetes/sail/pkg/features" -) - -type XdsCache interface { - Run(stop <-chan struct{}) -} - -type DisabledCache struct{} - -func NewXdsCache() XdsCache { - cache := XdsCacheImpl{ - eds: newTypedXdsCache[uint64](), - } - if features.EnableCDSCaching { - cache.cds = newTypedXdsCache[uint64]() - } else { - cache.cds = disabledCache[uint64]{} - } - if features.EnableRDSCaching { - cache.rds = newTypedXdsCache[uint64]() - } else { - cache.rds = disabledCache[uint64]{} - } - - cache.sds = newTypedXdsCache[string]() - - return cache -} - -func (x XdsCacheImpl) Run(stop <-chan struct{}) {} - -func (d DisabledCache) Run(stop <-chan struct{}) { -} diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go index 5591c2e6..79d5fa9d 100644 --- a/sail/pkg/xds/ads.go +++ b/sail/pkg/xds/ads.go @@ -19,10 +19,13 @@ package xds import ( "context" + "github.com/apache/dubbo-kubernetes/pkg/maps" + "github.com/apache/dubbo-kubernetes/pkg/util/sets" "github.com/apache/dubbo-kubernetes/pkg/xds" "github.com/apache/dubbo-kubernetes/sail/pkg/model" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "k8s.io/klog/v2" "time" ) @@ -63,6 +66,41 @@ func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) erro return s.Stream(stream) } +func (s *DiscoveryServer) StartPush(req *model.PushRequest) { + req.Start = time.Now() + for _, p := range s.AllClients() { + s.pushQueue.Enqueue(p, req) + } +} + +func (s *DiscoveryServer) AllClients() []*Connection { + s.adsClientsMutex.RLock() + defer s.adsClientsMutex.RUnlock() + return maps.Values(s.adsClients) +} + +func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) { + if !req.Full { + klog.Infof("XDS: Incremental Pushing ConnectedEndpoints:%d", s.adsClientCount()) + } else { + totalService := len(req.Push.GetAllServices()) + klog.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d", totalService, s.adsClientCount()) + + // Make sure the ConfigsUpdated map exists + if req.ConfigsUpdated == nil { + req.ConfigsUpdated = make(sets.Set[model.ConfigKey]) + } + } + + s.StartPush(req) +} + +func (s *DiscoveryServer) adsClientCount() int { + s.adsClientsMutex.RLock() + defer s.adsClientsMutex.RUnlock() + return len(s.adsClients) +} + func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, error) { return nil, nil } diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go index 57d9d4ba..7b00ce4f 100644 --- a/sail/pkg/xds/discovery.go +++ b/sail/pkg/xds/discovery.go @@ -18,6 +18,7 @@ package xds import ( + "fmt" "github.com/apache/dubbo-kubernetes/pkg/cluster" "github.com/apache/dubbo-kubernetes/pkg/kube/krt" "github.com/apache/dubbo-kubernetes/sail/pkg/model" @@ -26,6 +27,8 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" "k8s.io/klog/v2" + "strconv" + "sync" "time" ) @@ -40,7 +43,6 @@ type DiscoveryServer struct { serverReady atomic.Bool DiscoveryStartTime time.Time ClusterAliases map[cluster.ID]cluster.ID - Cache model.XdsCache pushQueue *PushQueue krtDebugger *krt.DebugHandler InboundUpdates *atomic.Int64 @@ -50,12 +52,13 @@ type DiscoveryServer struct { pushChannel chan *model.PushRequest DebounceOptions DebounceOptions concurrentPushLimit chan struct{} + adsClientsMutex sync.RWMutex + adsClients map[string]*Connection } func NewDiscoveryServer(env *model.Environment, clusterAliases map[string]string, debugger *krt.DebugHandler) *DiscoveryServer { out := &DiscoveryServer{ Env: env, - Cache: env.Cache, krtDebugger: debugger, InboundUpdates: atomic.NewInt64(0), CommittedUpdates: atomic.NewInt64(0), @@ -75,7 +78,6 @@ func (s *DiscoveryServer) Register(rpcs *grpc.Server) { func (s *DiscoveryServer) Start(stopCh <-chan struct{}) { go s.handleUpdates(stopCh) go s.sendPushes(stopCh) - go s.Cache.Run(stopCh) } func (s *DiscoveryServer) CachesSynced() { @@ -87,7 +89,13 @@ func (s *DiscoveryServer) Shutdown() { s.pushQueue.ShutDown() } -func (s *DiscoveryServer) Push(req *model.PushRequest) {} +func (s *DiscoveryServer) Push(req *model.PushRequest) { + if !req.Full { + req.Push = s.globalPushContext() + s.AdsPushAll(req) + return + } +} func (s *DiscoveryServer) globalPushContext() *model.PushContext { return s.Env.PushContext() @@ -102,6 +110,167 @@ func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) { } func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts DebounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) { + var timeChan <-chan time.Time + var startDebounce time.Time + var lastConfigUpdateTime time.Time + + pushCounter := 0 + debouncedEvents := 0 + + var req *model.PushRequest + + free := true + freeCh := make(chan struct{}, 1) + + push := func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) { + pushFn(req) + updateSent.Add(int64(debouncedEvents)) + freeCh <- struct{}{} + } + + pushWorker := func() { + eventDelay := time.Since(startDebounce) + quietTime := time.Since(lastConfigUpdateTime) + // it has been too long or quiet enough + if eventDelay >= opts.debounceMax || quietTime >= opts.DebounceAfter { + if req != nil { + pushCounter++ + if req.ConfigsUpdated == nil { + klog.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v", + pushCounter, debouncedEvents, reasonsUpdated(req), + quietTime, eventDelay, req.Full) + } else { + klog.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v", + pushCounter, debouncedEvents, configsUpdated(req), + quietTime, eventDelay, req.Full) + } + free = false + go push(req, debouncedEvents, startDebounce) + req = nil + debouncedEvents = 0 + } + } else { + timeChan = time.After(opts.DebounceAfter - quietTime) + } + } + + for { + select { + case <-freeCh: + free = true + pushWorker() + case r := <-ch: + // If reason is not set, record it as an unknown reason + if len(r.Reason) == 0 { + r.Reason = model.NewReasonStats(model.UnknownTrigger) + } + if !opts.enableEDSDebounce && !r.Full { + // trigger push now, just for EDS + go func(req *model.PushRequest) { + pushFn(req) + updateSent.Inc() + }(r) + continue + } + + lastConfigUpdateTime = time.Now() + if debouncedEvents == 0 { + timeChan = time.After(opts.DebounceAfter) + startDebounce = lastConfigUpdateTime + } + debouncedEvents++ + + req = req.Merge(r) + case <-timeChan: + if free { + pushWorker() + } + case <-stopCh: + return + } + } } -func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {} +func reasonsUpdated(req *model.PushRequest) string { + var ( + reason0, reason1 model.TriggerReason + reason0Cnt, reason1Cnt, idx int + ) + for r, cnt := range req.Reason { + if idx == 0 { + reason0, reason0Cnt = r, cnt + } else if idx == 1 { + reason1, reason1Cnt = r, cnt + } else { + break + } + idx++ + } + + switch len(req.Reason) { + case 0: + return "unknown" + case 1: + return fmt.Sprintf("%s:%d", reason0, reason0Cnt) + case 2: + return fmt.Sprintf("%s:%d and %s:%d", reason0, reason0Cnt, reason1, reason1Cnt) + default: + return fmt.Sprintf("%s:%d and %d(%d) more reasons", reason0, reason0Cnt, len(req.Reason)-1, + req.Reason.Count()-reason0Cnt) + } +} + +func configsUpdated(req *model.PushRequest) string { + configs := "" + for key := range req.ConfigsUpdated { + configs += key.String() + break + } + if len(req.ConfigsUpdated) > 1 { + more := " and " + strconv.Itoa(len(req.ConfigsUpdated)-1) + " more configs" + configs += more + } + return configs +} + +func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) { + for { + select { + case <-stopCh: + return + default: + semaphore <- struct{}{} + + // Get the next proxy to push. This will block if there are no updates required. + client, push, shuttingdown := queue.Dequeue() + if shuttingdown { + return + } + doneFunc := func() { + queue.MarkDone(client) + <-semaphore + } + + var closed <-chan struct{} + if client.deltaStream != nil { + closed = client.deltaStream.Context().Done() + } else { + closed = client.StreamDone() + } + go func() { + pushEv := &Event{ + pushRequest: push, + done: doneFunc, + } + + select { + case client.PushCh() <- pushEv: + return + case <-closed: // grpc stream was closed + doneFunc() + klog.Infof("Client closed connection %v", client.ID()) + } + }() + } + } +}