This is an automated email from the ASF dual-hosted git repository.

robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git


The following commit(s) were added to refs/heads/develop by this push:
     new dc5eb0d0 implement counter by key (#1390)
dc5eb0d0 is described below

commit dc5eb0d0a8275898eb2f67580779c92bb4e7b055
Author: WyRainBow <[email protected]>
AuthorDate: Sun Feb 1 16:45:32 2026 +0800

    implement counter by key (#1390)
    
    * implement counter by key
    
    * chore: trigger CI
    
    * Fix counter initialization errors and mesh change detection logic
    
    ---------
    
    Co-authored-by: WyRainBow <[email protected]>
---
 pkg/console/counter/component.go               |  86 ++++++++++++++++-
 pkg/console/counter/counter.go                 | 124 +++++++++++++++++++------
 pkg/console/counter/manager.go                 |  98 ++++++++++++++-----
 pkg/console/handler/overview.go                |  23 +++--
 pkg/core/discovery/subscriber/instance.go      |   1 +
 pkg/core/discovery/subscriber/nacos_service.go |   4 +
 6 files changed, 279 insertions(+), 57 deletions(-)

diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go
index 0217ee2f..8c39cb3a 100644
--- a/pkg/console/counter/component.go
+++ b/pkg/console/counter/component.go
@@ -22,7 +22,11 @@ import (
        "math"
 
        "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
        "github.com/apache/dubbo-admin/pkg/core/runtime"
+       "github.com/apache/dubbo-admin/pkg/core/store"
 )
 
 const ComponentType runtime.ComponentType = "counter manager"
@@ -41,7 +45,7 @@ var _ ManagerComponent = &managerComponent{}
 func (c *managerComponent) RequiredDependencies() []runtime.ComponentType {
        return []runtime.ComponentType{
                runtime.ResourceStore,
-               runtime.EventBus, // Counter depends on EventBus to subscribe 
to events
+               runtime.EventBus,
        }
 }
 
@@ -64,6 +68,19 @@ func (c *managerComponent) Init(runtime.BuilderContext) 
error {
 }
 
 func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
+       storeComponent, err := rt.GetComponent(runtime.ResourceStore)
+       if err != nil {
+               return err
+       }
+       storeRouter, ok := storeComponent.(store.Router)
+       if !ok {
+               return fmt.Errorf("component %s does not implement 
store.Router", runtime.ResourceStore)
+       }
+
+       if err := c.initializeCountsFromStore(storeRouter); err != nil {
+               logger.Warnf("Failed to initialize counter manager from store: 
%v", err)
+       }
+
        component, err := rt.GetComponent(runtime.EventBus)
        if err != nil {
                return err
@@ -75,6 +92,73 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ 
<-chan struct{}) error {
        return c.manager.Bind(bus)
 }
 
+func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router) 
error {
+       if err := c.initializeResourceCount(storeRouter, 
meshresource.InstanceKind); err != nil {
+               return fmt.Errorf("failed to initialize instance count: %w", 
err)
+       }
+
+       if err := c.initializeResourceCount(storeRouter, 
meshresource.ApplicationKind); err != nil {
+               return fmt.Errorf("failed to initialize application count: %w", 
err)
+       }
+
+       if err := c.initializeResourceCount(storeRouter, 
meshresource.ServiceProviderMetadataKind); err != nil {
+               return fmt.Errorf("failed to initialize service provider 
metadata count: %w", err)
+       }
+
+       return nil
+}
+
+func (c *managerComponent) initializeResourceCount(storeRouter store.Router, 
kind resmodel.ResourceKind) error {
+       resourceStore, err := storeRouter.ResourceKindRoute(kind)
+       if err != nil {
+               return err
+       }
+
+       allResources := resourceStore.List()
+       cm := c.manager.(*counterManager)
+
+       for _, obj := range allResources {
+               resource, ok := obj.(resmodel.Resource)
+               if !ok {
+                       continue
+               }
+
+               mesh := resource.ResourceMesh()
+               if mesh == "" {
+                       mesh = "default"
+               }
+
+               if counter, exists := cm.simpleCounters[kind]; exists {
+                       counter.Increment(mesh)
+               }
+
+               if kind == meshresource.InstanceKind {
+                       instance, ok := 
resource.(*meshresource.InstanceResource)
+                       if ok && instance.Spec != nil {
+                               protocol := instance.Spec.GetProtocol()
+                               if protocol != "" {
+                                       if cfg := 
cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil {
+                                               cfg.counter.Increment(mesh, 
protocol)
+                                       }
+                               }
+
+                               releaseVersion := 
instance.Spec.GetReleaseVersion()
+                               if releaseVersion != "" {
+                                       if cfg := 
cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil {
+                                               cfg.counter.Increment(mesh, 
releaseVersion)
+                                       }
+                               }
+
+                               if cfg := cm.getDistributionConfig(kind, 
DiscoveryCounter); cfg != nil {
+                                       cfg.counter.Increment(mesh, mesh)
+                               }
+                       }
+               }
+       }
+
+       return nil
+}
+
 func (c *managerComponent) CounterManager() CounterManager {
        return c.manager
 }
diff --git a/pkg/console/counter/counter.go b/pkg/console/counter/counter.go
index c32d3516..6d5e2a1a 100644
--- a/pkg/console/counter/counter.go
+++ b/pkg/console/counter/counter.go
@@ -19,70 +19,121 @@ package counter
 
 import (
        "sync"
-       "sync/atomic"
 )
 
 type Counter struct {
-       name  string
-       value atomic.Int64
+       name string
+       data map[string]int64
+       mu   sync.RWMutex
 }
 
 func NewCounter(name string) *Counter {
-       return &Counter{name: name}
+       return &Counter{
+               name: name,
+               data: make(map[string]int64),
+       }
 }
 
 func (c *Counter) Get() int64 {
-       return c.value.Load()
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       var sum int64
+       for _, v := range c.data {
+               sum += v
+       }
+       return sum
 }
 
-func (c *Counter) Increment() {
-       c.value.Add(1)
+func (c *Counter) GetByGroup(group string) int64 {
+       if group == "" {
+               group = "default"
+       }
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       return c.data[group]
 }
 
-func (c *Counter) Decrement() {
-       for {
-               current := c.value.Load()
-               if current == 0 {
-                       return
-               }
-               if c.value.CompareAndSwap(current, current-1) {
-                       return
+func (c *Counter) Increment(group string) {
+       if group == "" {
+               group = "default"
+       }
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.data[group]++
+}
+
+func (c *Counter) Decrement(group string) {
+       if group == "" {
+               group = "default"
+       }
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if value, ok := c.data[group]; ok {
+               value--
+               if value <= 0 {
+                       delete(c.data, group)
+               } else {
+                       c.data[group] = value
                }
        }
 }
 
 func (c *Counter) Reset() {
-       c.value.Store(0)
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.data = make(map[string]int64)
 }
 
 type DistributionCounter struct {
        name string
-       data map[string]int64
+       data map[string]map[string]int64
        mu   sync.RWMutex
 }
 
 func NewDistributionCounter(name string) *DistributionCounter {
        return &DistributionCounter{
                name: name,
-               data: make(map[string]int64),
+               data: make(map[string]map[string]int64),
        }
 }
 
-func (c *DistributionCounter) Increment(key string) {
+func (c *DistributionCounter) Increment(group, key string) {
+       if group == "" {
+               group = "default"
+       }
+       if key == "" {
+               key = "unknown"
+       }
        c.mu.Lock()
        defer c.mu.Unlock()
-       c.data[key]++
+       if c.data[group] == nil {
+               c.data[group] = make(map[string]int64)
+       }
+       c.data[group][key]++
 }
 
-func (c *DistributionCounter) Decrement(key string) {
+func (c *DistributionCounter) Decrement(group, key string) {
+       if group == "" {
+               group = "default"
+       }
+       if key == "" {
+               key = "unknown"
+       }
        c.mu.Lock()
        defer c.mu.Unlock()
-       if value, ok := c.data[key]; ok {
+       groupData, exists := c.data[group]
+       if !exists {
+               return
+       }
+       if value, ok := groupData[key]; ok {
                value--
                if value <= 0 {
-                       delete(c.data, key)
+                       delete(groupData, key)
+                       if len(groupData) == 0 {
+                               delete(c.data, group)
+                       }
                } else {
-                       c.data[key] = value
+                       groupData[key] = value
                }
        }
 }
@@ -90,8 +141,27 @@ func (c *DistributionCounter) Decrement(key string) {
 func (c *DistributionCounter) GetAll() map[string]int64 {
        c.mu.RLock()
        defer c.mu.RUnlock()
-       result := make(map[string]int64, len(c.data))
-       for k, v := range c.data {
+       result := make(map[string]int64)
+       for _, groupData := range c.data {
+               for k, v := range groupData {
+                       result[k] += v
+               }
+       }
+       return result
+}
+
+func (c *DistributionCounter) GetByGroup(group string) map[string]int64 {
+       if group == "" {
+               group = "default"
+       }
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       groupData, exists := c.data[group]
+       if !exists {
+               return map[string]int64{}
+       }
+       result := make(map[string]int64, len(groupData))
+       for k, v := range groupData {
                result[k] = v
        }
        return result
@@ -100,5 +170,5 @@ func (c *DistributionCounter) GetAll() map[string]int64 {
 func (c *DistributionCounter) Reset() {
        c.mu.Lock()
        defer c.mu.Unlock()
-       c.data = make(map[string]int64)
+       c.data = make(map[string]map[string]int64)
 }
diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go
index b8d0d549..581139fa 100644
--- a/pkg/console/counter/manager.go
+++ b/pkg/console/counter/manager.go
@@ -23,6 +23,7 @@ import (
        "k8s.io/client-go/tools/cache"
 
        "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
        meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
        resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
 )
@@ -42,6 +43,8 @@ type CounterManager interface {
        RegisterDistributionCounter(kind resmodel.ResourceKind, metric 
CounterType, extractor FieldExtractor)
        Count(kind resmodel.ResourceKind) int64
        Distribution(metric CounterType) map[string]int64
+       CountByMesh(kind resmodel.ResourceKind, mesh string) int64
+       DistributionByMesh(metric CounterType, mesh string) map[string]int64
        Reset()
        Bind(bus events.EventBus) error
 }
@@ -137,12 +140,28 @@ func (cm *counterManager) Distribution(metric 
CounterType) map[string]int64 {
        if !exists {
                return map[string]int64{}
        }
-       raw := counter.GetAll()
-       result := make(map[string]int64, len(raw))
-       for k, v := range raw {
-               result[k] = v
+       return counter.GetAll()
+}
+
+func (cm *counterManager) CountByMesh(kind resmodel.ResourceKind, mesh string) 
int64 {
+       if mesh == "" {
+               mesh = "default"
+       }
+       if counter, exists := cm.simpleCounters[kind]; exists {
+               return counter.GetByGroup(mesh)
+       }
+       return 0
+}
+
+func (cm *counterManager) DistributionByMesh(metric CounterType, mesh string) 
map[string]int64 {
+       if mesh == "" {
+               mesh = "default"
        }
-       return result
+       counter, exists := cm.distributionByType[metric]
+       if !exists {
+               return map[string]int64{}
+       }
+       return counter.GetByGroup(mesh)
 }
 
 func (cm *counterManager) Bind(bus events.EventBus) error {
@@ -167,11 +186,14 @@ func (cm *counterManager) Bind(bus events.EventBus) error 
{
                if err := bus.Subscribe(subscriber); err != nil {
                        return err
                }
+               logger.Infof("CounterManager subscribed to %s events", 
resourceKind)
        }
+       logger.Infof("CounterManager bound to EventBus successfully")
        return nil
 }
 
 func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event 
events.Event) error {
+       logger.Debugf("CounterManager handling %s event, type: %s", kind, 
event.Type())
        if counter := cm.simpleCounters[kind]; counter != nil {
                processSimpleCounter(counter, event)
        }
@@ -183,36 +205,72 @@ func (cm *counterManager) handleEvent(kind 
resmodel.ResourceKind, event events.E
        return nil
 }
 
+func (cm *counterManager) getDistributionConfig(kind resmodel.ResourceKind, 
metric CounterType) *distributionCounterConfig {
+       configs := cm.distributionConfigs[kind]
+       for _, cfg := range configs {
+               if cfg.counterType == metric {
+                       return cfg
+               }
+       }
+       return nil
+}
+
+func extractMeshName(res resmodel.Resource) string {
+       if res == nil {
+               return "default"
+       }
+       mesh := res.ResourceMesh()
+       if mesh == "" {
+               return "default"
+       }
+       return mesh
+}
+
 func processSimpleCounter(counter *Counter, event events.Event) {
+       mesh := extractMeshName(event.NewObj())
+       if event.NewObj() == nil {
+               mesh = extractMeshName(event.OldObj())
+       }
+
        switch event.Type() {
        case cache.Added:
-               counter.Increment()
+               counter.Increment(mesh)
+               logger.Debugf("CounterManager: Increment %s for mesh=%s, 
current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
        case cache.Sync, cache.Replaced:
                if isNewResourceEvent(event) {
-                       counter.Increment()
+                       counter.Increment(mesh)
+                       logger.Debugf("CounterManager: Increment %s for mesh=%s 
(Sync/Replaced), current count=%d", counter.name, mesh, 
counter.GetByGroup(mesh))
                }
        case cache.Deleted:
-               counter.Decrement()
+               counter.Decrement(mesh)
+               logger.Debugf("CounterManager: Decrement %s for mesh=%s, 
current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
        case cache.Updated:
-               // no-op for simple counters
        default:
        }
 }
 
 func processDistributionCounter(cfg *distributionCounterConfig, event 
events.Event) {
+       mesh := extractMeshName(event.NewObj())
+       if event.NewObj() == nil {
+               mesh = extractMeshName(event.OldObj())
+       }
+
        switch event.Type() {
        case cache.Added:
-               cfg.increment(cfg.extractFrom(event.NewObj()))
+               key := cfg.extractFrom(event.NewObj())
+               cfg.counter.Increment(mesh, normalizeDistributionKey(key))
        case cache.Sync, cache.Replaced:
                if isNewResourceEvent(event) {
-                       cfg.increment(cfg.extractFrom(event.NewObj()))
+                       key := cfg.extractFrom(event.NewObj())
+                       cfg.counter.Increment(mesh, 
normalizeDistributionKey(key))
                } else {
                        cfg.update(event.OldObj(), event.NewObj())
                }
        case cache.Updated:
                cfg.update(event.OldObj(), event.NewObj())
        case cache.Deleted:
-               cfg.decrement(cfg.extractFrom(event.OldObj()))
+               key := cfg.extractFrom(event.OldObj())
+               cfg.counter.Decrement(mesh, normalizeDistributionKey(key))
        default:
        }
 }
@@ -224,25 +282,19 @@ func (cfg *distributionCounterConfig) extractFrom(res 
resmodel.Resource) string
        return cfg.extractor(res)
 }
 
-func (cfg *distributionCounterConfig) increment(key string) {
-       cfg.counter.Increment(normalizeDistributionKey(key))
-}
-
-func (cfg *distributionCounterConfig) decrement(key string) {
-       cfg.counter.Decrement(normalizeDistributionKey(key))
-}
-
 func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource) 
{
        oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj))
        newKey := normalizeDistributionKey(cfg.extractFrom(newObj))
-       if oldKey == newKey {
+       oldMesh := extractMeshName(oldObj)
+       newMesh := extractMeshName(newObj)
+       if oldKey == newKey && oldMesh == newMesh {
                return
        }
        if oldObj != nil {
-               cfg.counter.Decrement(oldKey)
+               cfg.counter.Decrement(oldMesh, oldKey)
        }
        if newObj != nil {
-               cfg.counter.Increment(newKey)
+               cfg.counter.Increment(newMesh, newKey)
        }
 }
 
diff --git a/pkg/console/handler/overview.go b/pkg/console/handler/overview.go
index d0ad7c01..1d9dc971 100644
--- a/pkg/console/handler/overview.go
+++ b/pkg/console/handler/overview.go
@@ -70,12 +70,23 @@ func ClusterOverview(ctx consolectx.Context) 
gin.HandlerFunc {
        return func(c *gin.Context) {
                resp := model.NewOverviewResp()
                if counterMgr := ctx.CounterManager(); counterMgr != nil {
-                       resp.AppCount = 
counterMgr.Count(meshresource.ApplicationKind)
-                       resp.ServiceCount = 
counterMgr.Count(meshresource.ServiceProviderMetadataKind)
-                       resp.InsCount = 
counterMgr.Count(meshresource.InstanceKind)
-                       resp.Protocols = 
counterMgr.Distribution(counter.ProtocolCounter)
-                       resp.Releases = 
counterMgr.Distribution(counter.ReleaseCounter)
-                       resp.Discoveries = 
counterMgr.Distribution(counter.DiscoveryCounter)
+                       mesh := c.Query("mesh")
+
+                       if mesh != "" {
+                               resp.AppCount = 
counterMgr.CountByMesh(meshresource.ApplicationKind, mesh)
+                               resp.ServiceCount = 
counterMgr.CountByMesh(meshresource.ServiceProviderMetadataKind, mesh)
+                               resp.InsCount = 
counterMgr.CountByMesh(meshresource.InstanceKind, mesh)
+                               resp.Protocols = 
counterMgr.DistributionByMesh(counter.ProtocolCounter, mesh)
+                               resp.Releases = 
counterMgr.DistributionByMesh(counter.ReleaseCounter, mesh)
+                               resp.Discoveries = 
counterMgr.DistributionByMesh(counter.DiscoveryCounter, mesh)
+                       } else {
+                               resp.AppCount = 
counterMgr.Count(meshresource.ApplicationKind)
+                               resp.ServiceCount = 
counterMgr.Count(meshresource.ServiceProviderMetadataKind)
+                               resp.InsCount = 
counterMgr.Count(meshresource.InstanceKind)
+                               resp.Protocols = 
counterMgr.Distribution(counter.ProtocolCounter)
+                               resp.Releases = 
counterMgr.Distribution(counter.ReleaseCounter)
+                               resp.Discoveries = 
counterMgr.Distribution(counter.DiscoveryCounter)
+                       }
                }
                c.JSON(http.StatusOK, model.NewSuccessResp(resp))
        }
diff --git a/pkg/core/discovery/subscriber/instance.go 
b/pkg/core/discovery/subscriber/instance.go
index 99714403..ded8a67d 100644
--- a/pkg/core/discovery/subscriber/instance.go
+++ b/pkg/core/discovery/subscriber/instance.go
@@ -71,6 +71,7 @@ func (s *InstanceEventSubscriber) ProcessEvent(event 
events.Event) error {
                instanceRes = oldObj
        }
        instanceResList, err := s.instanceStore.ListByIndexes(map[string]string{
+               index.ByMeshIndex:            instanceRes.Mesh,
                index.ByInstanceAppNameIndex: instanceRes.Spec.AppName,
        })
 
diff --git a/pkg/core/discovery/subscriber/nacos_service.go 
b/pkg/core/discovery/subscriber/nacos_service.go
index 6a83771d..e5c9c631 100644
--- a/pkg/core/discovery/subscriber/nacos_service.go
+++ b/pkg/core/discovery/subscriber/nacos_service.go
@@ -128,6 +128,7 @@ func (n *NacosServiceEventSubscriber) 
processConsumerMetadataUpsert(serviceRes *
                return err
        }
        resources, err := st.ListByIndexes(map[string]string{
+               index.ByMeshIndex:                  serviceRes.Mesh,
                index.ByServiceConsumerServiceName: serviceName,
        })
        if err != nil {
@@ -212,6 +213,7 @@ func (n *NacosServiceEventSubscriber) 
processRPCInstanceUpsert(serviceRes *meshr
                return err
        }
        resources, err := st.ListByIndexes(map[string]string{
+               index.ByMeshIndex:          serviceRes.Mesh,
                index.ByRPCInstanceAppName: serviceRes.Name,
        })
        if err != nil {
@@ -286,6 +288,7 @@ func (n *NacosServiceEventSubscriber) 
processServiceConsumerDelete(serviceRes *m
                return err
        }
        resources, err := st.ListByIndexes(map[string]string{
+               index.ByMeshIndex:                  serviceRes.Mesh,
                index.ByServiceConsumerServiceName: serviceRes.Name,
        })
        if err != nil {
@@ -309,6 +312,7 @@ func (n *NacosServiceEventSubscriber) 
processRPCInstanceDelete(serviceRes *meshr
                return err
        }
        resources, err := st.ListByIndexes(map[string]string{
+               index.ByMeshIndex:          serviceRes.Mesh,
                index.ByRPCInstanceAppName: serviceRes.Name,
        })
        if err != nil {

Reply via email to