This is an automated email from the ASF dual-hosted git repository.

robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1e55d306 Implment Counter in local cache (#1345)
1e55d306 is described below

commit 1e55d30635381f76d9e94478082a74b351887283
Author: WyRainBow <[email protected]>
AuthorDate: Mon Nov 10 20:57:05 2025 +0800

    Implment Counter in local cache (#1345)
---
 pkg/console/context/context.go   |  14 ++
 pkg/console/counter/component.go |  73 ++++++++++
 pkg/console/counter/counter.go   | 104 +++++++++++++
 pkg/console/counter/manager.go   | 306 +++++++++++++++++++++++++++++++++++++++
 pkg/console/handler/overview.go  |  11 +-
 pkg/console/model/overview.go    |  12 +-
 pkg/core/bootstrap/bootstrap.go  |  15 +-
 pkg/core/events/component.go     |   6 +-
 8 files changed, 530 insertions(+), 11 deletions(-)

diff --git a/pkg/console/context/context.go b/pkg/console/context/context.go
index f423ca09..9133c4f9 100644
--- a/pkg/console/context/context.go
+++ b/pkg/console/context/context.go
@@ -21,12 +21,14 @@ import (
        ctx "context"
 
        "github.com/apache/dubbo-admin/pkg/config/app"
+       "github.com/apache/dubbo-admin/pkg/console/counter"
        "github.com/apache/dubbo-admin/pkg/core/manager"
        "github.com/apache/dubbo-admin/pkg/core/runtime"
 )
 
 type Context interface {
        ResourceManager() manager.ResourceManager
+       CounterManager() counter.CounterManager
 
        Config() app.AdminConfig
 
@@ -57,3 +59,15 @@ func (c *context) ResourceManager() manager.ResourceManager {
        rmc, _ := c.coreRt.GetComponent(runtime.ResourceManager)
        return rmc.(manager.ResourceManagerComponent).ResourceManager()
 }
+
+func (c *context) CounterManager() counter.CounterManager {
+       comp, err := c.coreRt.GetComponent(counter.ComponentType)
+       if err != nil {
+               return nil
+       }
+       managerComp, ok := comp.(counter.ManagerComponent)
+       if !ok {
+               return nil
+       }
+       return managerComp.CounterManager()
+}
diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go
new file mode 100644
index 00000000..c38d7fe8
--- /dev/null
+++ b/pkg/console/counter/component.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package counter
+
+import (
+       "fmt"
+       "math"
+
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/runtime"
+)
+
+const ComponentType runtime.ComponentType = "counter manager"
+
+func init() {
+       runtime.RegisterComponent(&managerComponent{})
+}
+
+type ManagerComponent interface {
+       runtime.Component
+       CounterManager() CounterManager
+}
+
+var _ ManagerComponent = &managerComponent{}
+
+type managerComponent struct {
+       manager CounterManager
+}
+
+func (c *managerComponent) Type() runtime.ComponentType {
+       return ComponentType
+}
+
+func (c *managerComponent) Order() int {
+       return math.MaxInt - 1
+}
+
+func (c *managerComponent) Init(runtime.BuilderContext) error {
+       mgr := NewCounterManager()
+       c.manager = mgr
+       return nil
+}
+
+func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
+       component, err := rt.GetComponent(runtime.EventBus)
+       if err != nil {
+               return err
+       }
+       bus, ok := component.(events.EventBus)
+       if !ok {
+               return fmt.Errorf("component %s does not implement 
events.EventBus", runtime.EventBus)
+       }
+       return c.manager.Bind(bus)
+}
+
+func (c *managerComponent) CounterManager() CounterManager {
+       return c.manager
+}
diff --git a/pkg/console/counter/counter.go b/pkg/console/counter/counter.go
new file mode 100644
index 00000000..c32d3516
--- /dev/null
+++ b/pkg/console/counter/counter.go
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package counter
+
+import (
+       "sync"
+       "sync/atomic"
+)
+
+type Counter struct {
+       name  string
+       value atomic.Int64
+}
+
+func NewCounter(name string) *Counter {
+       return &Counter{name: name}
+}
+
+func (c *Counter) Get() int64 {
+       return c.value.Load()
+}
+
+func (c *Counter) Increment() {
+       c.value.Add(1)
+}
+
+func (c *Counter) Decrement() {
+       for {
+               current := c.value.Load()
+               if current == 0 {
+                       return
+               }
+               if c.value.CompareAndSwap(current, current-1) {
+                       return
+               }
+       }
+}
+
+func (c *Counter) Reset() {
+       c.value.Store(0)
+}
+
+type DistributionCounter struct {
+       name string
+       data map[string]int64
+       mu   sync.RWMutex
+}
+
+func NewDistributionCounter(name string) *DistributionCounter {
+       return &DistributionCounter{
+               name: name,
+               data: make(map[string]int64),
+       }
+}
+
+func (c *DistributionCounter) Increment(key string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.data[key]++
+}
+
+func (c *DistributionCounter) Decrement(key string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if value, ok := c.data[key]; ok {
+               value--
+               if value <= 0 {
+                       delete(c.data, key)
+               } else {
+                       c.data[key] = value
+               }
+       }
+}
+
+func (c *DistributionCounter) GetAll() map[string]int64 {
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       result := make(map[string]int64, len(c.data))
+       for k, v := range c.data {
+               result[k] = v
+       }
+       return result
+}
+
+func (c *DistributionCounter) Reset() {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.data = make(map[string]int64)
+}
diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go
new file mode 100644
index 00000000..f9fb7ea3
--- /dev/null
+++ b/pkg/console/counter/manager.go
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package counter
+
+import (
+       "fmt"
+
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+)
+
+type CounterType string
+
+type FieldExtractor func(resmodel.Resource) string
+
+const (
+       ProtocolCounter  CounterType = "protocol"
+       ReleaseCounter   CounterType = "release"
+       DiscoveryCounter CounterType = "discovery"
+)
+
+type CounterManager interface {
+       RegisterSimpleCounter(kind resmodel.ResourceKind)
+       RegisterDistributionCounter(kind resmodel.ResourceKind, metric 
CounterType, extractor FieldExtractor)
+       Count(kind resmodel.ResourceKind) int64
+       Distribution(metric CounterType) map[string]int64
+       Reset()
+       Bind(bus events.EventBus) error
+}
+
+type distributionCounterConfig struct {
+       counterType CounterType
+       counter     *DistributionCounter
+       extractor   func(resmodel.Resource) string
+}
+
+type counterManager struct {
+       simpleCounters      map[resmodel.ResourceKind]*Counter
+       distributionConfigs 
map[resmodel.ResourceKind][]*distributionCounterConfig
+       distributionByType  map[CounterType]*DistributionCounter
+}
+
+func NewCounterManager() CounterManager {
+       return newCounterManager()
+}
+
+func newCounterManager() *counterManager {
+       cm := &counterManager{
+               simpleCounters:      make(map[resmodel.ResourceKind]*Counter),
+               distributionConfigs: 
make(map[resmodel.ResourceKind][]*distributionCounterConfig),
+               distributionByType:  make(map[CounterType]*DistributionCounter),
+       }
+
+       cm.RegisterSimpleCounter(meshresource.ApplicationKind)
+       cm.RegisterSimpleCounter(meshresource.ServiceKind)
+       cm.RegisterSimpleCounter(meshresource.InstanceKind)
+
+       cm.RegisterDistributionCounter(meshresource.InstanceKind, 
ProtocolCounter, instanceProtocolKey)
+       cm.RegisterDistributionCounter(meshresource.InstanceKind, 
ReleaseCounter, instanceReleaseKey)
+       cm.RegisterDistributionCounter(meshresource.InstanceKind, 
DiscoveryCounter, instanceMeshKey)
+
+       return cm
+}
+
+func (cm *counterManager) RegisterSimpleCounter(kind resmodel.ResourceKind) {
+       if kind == "" {
+               return
+       }
+       if _, exists := cm.simpleCounters[kind]; exists {
+               return
+       }
+       cm.simpleCounters[kind] = NewCounter(string(kind))
+}
+
+func (cm *counterManager) RegisterDistributionCounter(kind 
resmodel.ResourceKind, metric CounterType, extractor FieldExtractor) {
+       if kind == "" || metric == "" {
+               return
+       }
+       counter := cm.distributionByType[metric]
+       if counter == nil {
+               counter = NewDistributionCounter(string(metric))
+               cm.distributionByType[metric] = counter
+       }
+
+       configs := cm.distributionConfigs[kind]
+       for _, cfg := range configs {
+               if cfg.counterType == metric {
+                       cfg.counter = counter
+                       cfg.extractor = extractor
+                       return
+               }
+       }
+
+       cm.distributionConfigs[kind] = append(configs, 
&distributionCounterConfig{
+               counterType: metric,
+               counter:     counter,
+               extractor:   extractor,
+       })
+}
+
+func (cm *counterManager) Reset() {
+       for _, counter := range cm.simpleCounters {
+               counter.Reset()
+       }
+       for _, counter := range cm.distributionByType {
+               counter.Reset()
+       }
+}
+
+func (cm *counterManager) Count(kind resmodel.ResourceKind) int64 {
+       if counter, exists := cm.simpleCounters[kind]; exists {
+               return counter.Get()
+       }
+       return 0
+}
+
+func (cm *counterManager) Distribution(metric CounterType) map[string]int64 {
+       counter, exists := cm.distributionByType[metric]
+       if !exists {
+               return map[string]int64{}
+       }
+       raw := counter.GetAll()
+       result := make(map[string]int64, len(raw))
+       for k, v := range raw {
+               result[k] = v
+       }
+       return result
+}
+
+func (cm *counterManager) Bind(bus events.EventBus) error {
+       handledKinds := make(map[resmodel.ResourceKind]struct{})
+       for kind := range cm.simpleCounters {
+               handledKinds[kind] = struct{}{}
+       }
+       for kind := range cm.distributionConfigs {
+               handledKinds[kind] = struct{}{}
+       }
+
+       for kind := range handledKinds {
+               resourceKind := kind
+               name := fmt.Sprintf("counter-manager/%s", resourceKind)
+               subscriber := &counterEventSubscriber{
+                       kind: resourceKind,
+                       name: name,
+                       handler: func(event events.Event) error {
+                               return cm.handleEvent(resourceKind, event)
+                       },
+               }
+               if err := bus.Subscribe(subscriber); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event 
events.Event) error {
+       if counter := cm.simpleCounters[kind]; counter != nil {
+               processSimpleCounter(counter, event)
+       }
+       if configs := cm.distributionConfigs[kind]; len(configs) > 0 {
+               for _, cfg := range configs {
+                       processDistributionCounter(cfg, event)
+               }
+       }
+       return nil
+}
+
+func processSimpleCounter(counter *Counter, event events.Event) {
+       switch event.Type() {
+       case cache.Added:
+               counter.Increment()
+       case cache.Sync, cache.Replaced:
+               if isNewResourceEvent(event) {
+                       counter.Increment()
+               }
+       case cache.Deleted:
+               counter.Decrement()
+       case cache.Updated:
+               // no-op for simple counters
+       default:
+       }
+}
+
+func processDistributionCounter(cfg *distributionCounterConfig, event 
events.Event) {
+       switch event.Type() {
+       case cache.Added:
+               cfg.increment(cfg.extractFrom(event.NewObj()))
+       case cache.Sync, cache.Replaced:
+               if isNewResourceEvent(event) {
+                       cfg.increment(cfg.extractFrom(event.NewObj()))
+               } else {
+                       cfg.update(event.OldObj(), event.NewObj())
+               }
+       case cache.Updated:
+               cfg.update(event.OldObj(), event.NewObj())
+       case cache.Deleted:
+               cfg.decrement(cfg.extractFrom(event.OldObj()))
+       default:
+       }
+}
+
+func (cfg *distributionCounterConfig) extractFrom(res resmodel.Resource) 
string {
+       if cfg.extractor == nil || res == nil {
+               return ""
+       }
+       return cfg.extractor(res)
+}
+
+func (cfg *distributionCounterConfig) increment(key string) {
+       cfg.counter.Increment(normalizeDistributionKey(key))
+}
+
+func (cfg *distributionCounterConfig) decrement(key string) {
+       cfg.counter.Decrement(normalizeDistributionKey(key))
+}
+
+func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource) 
{
+       oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj))
+       newKey := normalizeDistributionKey(cfg.extractFrom(newObj))
+       if oldKey == newKey {
+               return
+       }
+       if oldObj != nil {
+               cfg.counter.Decrement(oldKey)
+       }
+       if newObj != nil {
+               cfg.counter.Increment(newKey)
+       }
+}
+
+func instanceProtocolKey(res resmodel.Resource) string {
+       instance, ok := res.(*meshresource.InstanceResource)
+       if !ok || instance == nil || instance.Spec == nil {
+               return ""
+       }
+       return instance.Spec.GetProtocol()
+}
+
+func instanceReleaseKey(res resmodel.Resource) string {
+       instance, ok := res.(*meshresource.InstanceResource)
+       if !ok || instance == nil || instance.Spec == nil {
+               return ""
+       }
+       return instance.Spec.GetReleaseVersion()
+}
+
+func instanceMeshKey(res resmodel.Resource) string {
+       instance, ok := res.(*meshresource.InstanceResource)
+       if !ok || instance == nil {
+               return ""
+       }
+       return instance.Mesh
+}
+
+func normalizeDistributionKey(key string) string {
+       if key == "" {
+               return "unknown"
+       }
+       return key
+}
+
+func isNewResourceEvent(event events.Event) bool {
+       if event == nil {
+               return false
+       }
+       return event.OldObj() == nil
+}
+
+type counterEventSubscriber struct {
+       kind    resmodel.ResourceKind
+       name    string
+       handler func(events.Event) error
+}
+
+func (s *counterEventSubscriber) ResourceKind() resmodel.ResourceKind {
+       return s.kind
+}
+
+func (s *counterEventSubscriber) Name() string {
+       return s.name
+}
+
+func (s *counterEventSubscriber) ProcessEvent(event events.Event) error {
+       if s.handler == nil {
+               return nil
+       }
+       return s.handler(event)
+}
diff --git a/pkg/console/handler/overview.go b/pkg/console/handler/overview.go
index ae1a1e6d..c2aac533 100644
--- a/pkg/console/handler/overview.go
+++ b/pkg/console/handler/overview.go
@@ -23,7 +23,9 @@ import (
        "github.com/gin-gonic/gin"
 
        consolectx "github.com/apache/dubbo-admin/pkg/console/context"
+       "github.com/apache/dubbo-admin/pkg/console/counter"
        "github.com/apache/dubbo-admin/pkg/console/model"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
 )
 
 func AdminMetadata(ctx consolectx.Context) gin.HandlerFunc {
@@ -38,7 +40,14 @@ func AdminMetadata(ctx consolectx.Context) gin.HandlerFunc {
 func ClusterOverview(ctx consolectx.Context) gin.HandlerFunc {
        return func(c *gin.Context) {
                resp := model.NewOverviewResp()
-
+               if counterMgr := ctx.CounterManager(); counterMgr != nil {
+                       resp.AppCount = 
counterMgr.Count(meshresource.ApplicationKind)
+                       resp.ServiceCount = 
counterMgr.Count(meshresource.ServiceKind)
+                       resp.InsCount = 
counterMgr.Count(meshresource.InstanceKind)
+                       resp.Protocols = 
counterMgr.Distribution(counter.ProtocolCounter)
+                       resp.Releases = 
counterMgr.Distribution(counter.ReleaseCounter)
+                       resp.Discoveries = 
counterMgr.Distribution(counter.DiscoveryCounter)
+               }
                c.JSON(http.StatusOK, model.NewSuccessResp(resp))
        }
 }
diff --git a/pkg/console/model/overview.go b/pkg/console/model/overview.go
index 2a467a92..6ffbf0e0 100644
--- a/pkg/console/model/overview.go
+++ b/pkg/console/model/overview.go
@@ -18,12 +18,12 @@
 package model
 
 type OverviewResp struct {
-       AppCount     int            `json:"appCount"`
-       ServiceCount int            `json:"serviceCount"`
-       InsCount     int            `json:"insCount"`
-       Protocols    map[string]int `json:"protocols"`
-       Releases     map[string]int `json:"releases"`
-       Discoveries  map[string]int `json:"discoveries"`
+       AppCount     int64            `json:"appCount"`
+       ServiceCount int64            `json:"serviceCount"`
+       InsCount     int64            `json:"insCount"`
+       Protocols    map[string]int64 `json:"protocols"`
+       Releases     map[string]int64 `json:"releases"`
+       Discoveries  map[string]int64 `json:"discoveries"`
 }
 
 func NewOverviewResp() *OverviewResp {
diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go
index 213c817f..7abcae7d 100644
--- a/pkg/core/bootstrap/bootstrap.go
+++ b/pkg/core/bootstrap/bootstrap.go
@@ -23,6 +23,7 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/dubbo-admin/pkg/config/app"
+       "github.com/apache/dubbo-admin/pkg/console/counter"
        "github.com/apache/dubbo-admin/pkg/core/logger"
        "github.com/apache/dubbo-admin/pkg/core/runtime"
        "github.com/apache/dubbo-admin/pkg/diagnostics"
@@ -57,7 +58,11 @@ func Bootstrap(appCtx context.Context, cfg app.AdminConfig) 
(runtime.Runtime, er
        if err := initializeConsole(builder); err != nil {
                return nil, err
        }
-       // 6. initialize diagnotics
+       // 6. initialize counter manager
+       if err := initializeCounterManager(builder); err != nil {
+               return nil, err
+       }
+       // 7. initialize diagnostics
        if err := initializeDiagnoticsServer(builder); err != nil {
                logger.Errorf("got error when init diagnotics server %s", err)
        }
@@ -123,6 +128,14 @@ func initializeDiagnoticsServer(builder *runtime.Builder) 
error {
        return initAndActivateComponent(builder, comp)
 }
 
+func initializeCounterManager(builder *runtime.Builder) error {
+       comp, err := runtime.ComponentRegistry().Get(counter.ComponentType)
+       if err != nil {
+               return err
+       }
+       return initAndActivateComponent(builder, comp)
+}
+
 func initAndActivateComponent(builder *runtime.Builder, comp 
runtime.Component) error {
        logger.Infof("initializing %s ...", comp.Type())
        if err := comp.Init(builder); err != nil {
diff --git a/pkg/core/events/component.go b/pkg/core/events/component.go
index 6c1fff9a..2e6bfc0f 100644
--- a/pkg/core/events/component.go
+++ b/pkg/core/events/component.go
@@ -100,10 +100,10 @@ func (b *eventBus) Send(event Event) {
        b.rwMutex.RLock()
        defer b.rwMutex.RUnlock()
        var rk model.ResourceKind
-       if event.OldObj() != nil {
-               rk = event.OldObj().ResourceKind()
-       } else if event.NewObj() != nil {
+       if event.NewObj() != nil {
                rk = event.NewObj().ResourceKind()
+       } else if event.OldObj() != nil {
+               rk = event.OldObj().ResourceKind()
        }
        subs, exists := b.subscriberDir[rk]
        if !exists {

Reply via email to