This is an automated email from the ASF dual-hosted git repository.
robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/develop by this push:
new dc5eb0d0 implement counter by key (#1390)
dc5eb0d0 is described below
commit dc5eb0d0a8275898eb2f67580779c92bb4e7b055
Author: WyRainBow <[email protected]>
AuthorDate: Sun Feb 1 16:45:32 2026 +0800
implement counter by key (#1390)
* implement counter by key
* chore: trigger CI
* Fix counter initialization errors and mesh change detection logic
---------
Co-authored-by: WyRainBow <[email protected]>
---
pkg/console/counter/component.go | 86 ++++++++++++++++-
pkg/console/counter/counter.go | 124 +++++++++++++++++++------
pkg/console/counter/manager.go | 98 ++++++++++++++-----
pkg/console/handler/overview.go | 23 +++--
pkg/core/discovery/subscriber/instance.go | 1 +
pkg/core/discovery/subscriber/nacos_service.go | 4 +
6 files changed, 279 insertions(+), 57 deletions(-)
diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go
index 0217ee2f..8c39cb3a 100644
--- a/pkg/console/counter/component.go
+++ b/pkg/console/counter/component.go
@@ -22,7 +22,11 @@ import (
"math"
"github.com/apache/dubbo-admin/pkg/core/events"
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+ meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+ resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
"github.com/apache/dubbo-admin/pkg/core/runtime"
+ "github.com/apache/dubbo-admin/pkg/core/store"
)
const ComponentType runtime.ComponentType = "counter manager"
@@ -41,7 +45,7 @@ var _ ManagerComponent = &managerComponent{}
func (c *managerComponent) RequiredDependencies() []runtime.ComponentType {
return []runtime.ComponentType{
runtime.ResourceStore,
- runtime.EventBus, // Counter depends on EventBus to subscribe
to events
+ runtime.EventBus,
}
}
@@ -64,6 +68,19 @@ func (c *managerComponent) Init(runtime.BuilderContext)
error {
}
func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
+ storeComponent, err := rt.GetComponent(runtime.ResourceStore)
+ if err != nil {
+ return err
+ }
+ storeRouter, ok := storeComponent.(store.Router)
+ if !ok {
+ return fmt.Errorf("component %s does not implement
store.Router", runtime.ResourceStore)
+ }
+
+ if err := c.initializeCountsFromStore(storeRouter); err != nil {
+ logger.Warnf("Failed to initialize counter manager from store:
%v", err)
+ }
+
component, err := rt.GetComponent(runtime.EventBus)
if err != nil {
return err
@@ -75,6 +92,73 @@ func (c *managerComponent) Start(rt runtime.Runtime, _
<-chan struct{}) error {
return c.manager.Bind(bus)
}
+func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router)
error {
+ if err := c.initializeResourceCount(storeRouter,
meshresource.InstanceKind); err != nil {
+ return fmt.Errorf("failed to initialize instance count: %w",
err)
+ }
+
+ if err := c.initializeResourceCount(storeRouter,
meshresource.ApplicationKind); err != nil {
+ return fmt.Errorf("failed to initialize application count: %w",
err)
+ }
+
+ if err := c.initializeResourceCount(storeRouter,
meshresource.ServiceProviderMetadataKind); err != nil {
+ return fmt.Errorf("failed to initialize service provider
metadata count: %w", err)
+ }
+
+ return nil
+}
+
+func (c *managerComponent) initializeResourceCount(storeRouter store.Router,
kind resmodel.ResourceKind) error {
+ resourceStore, err := storeRouter.ResourceKindRoute(kind)
+ if err != nil {
+ return err
+ }
+
+ allResources := resourceStore.List()
+ cm := c.manager.(*counterManager)
+
+ for _, obj := range allResources {
+ resource, ok := obj.(resmodel.Resource)
+ if !ok {
+ continue
+ }
+
+ mesh := resource.ResourceMesh()
+ if mesh == "" {
+ mesh = "default"
+ }
+
+ if counter, exists := cm.simpleCounters[kind]; exists {
+ counter.Increment(mesh)
+ }
+
+ if kind == meshresource.InstanceKind {
+ instance, ok :=
resource.(*meshresource.InstanceResource)
+ if ok && instance.Spec != nil {
+ protocol := instance.Spec.GetProtocol()
+ if protocol != "" {
+ if cfg :=
cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil {
+ cfg.counter.Increment(mesh,
protocol)
+ }
+ }
+
+ releaseVersion :=
instance.Spec.GetReleaseVersion()
+ if releaseVersion != "" {
+ if cfg :=
cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil {
+ cfg.counter.Increment(mesh,
releaseVersion)
+ }
+ }
+
+ if cfg := cm.getDistributionConfig(kind,
DiscoveryCounter); cfg != nil {
+ cfg.counter.Increment(mesh, mesh)
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
func (c *managerComponent) CounterManager() CounterManager {
return c.manager
}
diff --git a/pkg/console/counter/counter.go b/pkg/console/counter/counter.go
index c32d3516..6d5e2a1a 100644
--- a/pkg/console/counter/counter.go
+++ b/pkg/console/counter/counter.go
@@ -19,70 +19,121 @@ package counter
import (
"sync"
- "sync/atomic"
)
type Counter struct {
- name string
- value atomic.Int64
+ name string
+ data map[string]int64
+ mu sync.RWMutex
}
func NewCounter(name string) *Counter {
- return &Counter{name: name}
+ return &Counter{
+ name: name,
+ data: make(map[string]int64),
+ }
}
func (c *Counter) Get() int64 {
- return c.value.Load()
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ var sum int64
+ for _, v := range c.data {
+ sum += v
+ }
+ return sum
}
-func (c *Counter) Increment() {
- c.value.Add(1)
+func (c *Counter) GetByGroup(group string) int64 {
+ if group == "" {
+ group = "default"
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.data[group]
}
-func (c *Counter) Decrement() {
- for {
- current := c.value.Load()
- if current == 0 {
- return
- }
- if c.value.CompareAndSwap(current, current-1) {
- return
+func (c *Counter) Increment(group string) {
+ if group == "" {
+ group = "default"
+ }
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.data[group]++
+}
+
+func (c *Counter) Decrement(group string) {
+ if group == "" {
+ group = "default"
+ }
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if value, ok := c.data[group]; ok {
+ value--
+ if value <= 0 {
+ delete(c.data, group)
+ } else {
+ c.data[group] = value
}
}
}
func (c *Counter) Reset() {
- c.value.Store(0)
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.data = make(map[string]int64)
}
type DistributionCounter struct {
name string
- data map[string]int64
+ data map[string]map[string]int64
mu sync.RWMutex
}
func NewDistributionCounter(name string) *DistributionCounter {
return &DistributionCounter{
name: name,
- data: make(map[string]int64),
+ data: make(map[string]map[string]int64),
}
}
-func (c *DistributionCounter) Increment(key string) {
+func (c *DistributionCounter) Increment(group, key string) {
+ if group == "" {
+ group = "default"
+ }
+ if key == "" {
+ key = "unknown"
+ }
c.mu.Lock()
defer c.mu.Unlock()
- c.data[key]++
+ if c.data[group] == nil {
+ c.data[group] = make(map[string]int64)
+ }
+ c.data[group][key]++
}
-func (c *DistributionCounter) Decrement(key string) {
+func (c *DistributionCounter) Decrement(group, key string) {
+ if group == "" {
+ group = "default"
+ }
+ if key == "" {
+ key = "unknown"
+ }
c.mu.Lock()
defer c.mu.Unlock()
- if value, ok := c.data[key]; ok {
+ groupData, exists := c.data[group]
+ if !exists {
+ return
+ }
+ if value, ok := groupData[key]; ok {
value--
if value <= 0 {
- delete(c.data, key)
+ delete(groupData, key)
+ if len(groupData) == 0 {
+ delete(c.data, group)
+ }
} else {
- c.data[key] = value
+ groupData[key] = value
}
}
}
@@ -90,8 +141,27 @@ func (c *DistributionCounter) Decrement(key string) {
func (c *DistributionCounter) GetAll() map[string]int64 {
c.mu.RLock()
defer c.mu.RUnlock()
- result := make(map[string]int64, len(c.data))
- for k, v := range c.data {
+ result := make(map[string]int64)
+ for _, groupData := range c.data {
+ for k, v := range groupData {
+ result[k] += v
+ }
+ }
+ return result
+}
+
+func (c *DistributionCounter) GetByGroup(group string) map[string]int64 {
+ if group == "" {
+ group = "default"
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ groupData, exists := c.data[group]
+ if !exists {
+ return map[string]int64{}
+ }
+ result := make(map[string]int64, len(groupData))
+ for k, v := range groupData {
result[k] = v
}
return result
@@ -100,5 +170,5 @@ func (c *DistributionCounter) GetAll() map[string]int64 {
func (c *DistributionCounter) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
- c.data = make(map[string]int64)
+ c.data = make(map[string]map[string]int64)
}
diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go
index b8d0d549..581139fa 100644
--- a/pkg/console/counter/manager.go
+++ b/pkg/console/counter/manager.go
@@ -23,6 +23,7 @@ import (
"k8s.io/client-go/tools/cache"
"github.com/apache/dubbo-admin/pkg/core/events"
+ "github.com/apache/dubbo-admin/pkg/core/logger"
meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
)
@@ -42,6 +43,8 @@ type CounterManager interface {
RegisterDistributionCounter(kind resmodel.ResourceKind, metric
CounterType, extractor FieldExtractor)
Count(kind resmodel.ResourceKind) int64
Distribution(metric CounterType) map[string]int64
+ CountByMesh(kind resmodel.ResourceKind, mesh string) int64
+ DistributionByMesh(metric CounterType, mesh string) map[string]int64
Reset()
Bind(bus events.EventBus) error
}
@@ -137,12 +140,28 @@ func (cm *counterManager) Distribution(metric
CounterType) map[string]int64 {
if !exists {
return map[string]int64{}
}
- raw := counter.GetAll()
- result := make(map[string]int64, len(raw))
- for k, v := range raw {
- result[k] = v
+ return counter.GetAll()
+}
+
+func (cm *counterManager) CountByMesh(kind resmodel.ResourceKind, mesh string)
int64 {
+ if mesh == "" {
+ mesh = "default"
+ }
+ if counter, exists := cm.simpleCounters[kind]; exists {
+ return counter.GetByGroup(mesh)
+ }
+ return 0
+}
+
+func (cm *counterManager) DistributionByMesh(metric CounterType, mesh string)
map[string]int64 {
+ if mesh == "" {
+ mesh = "default"
}
- return result
+ counter, exists := cm.distributionByType[metric]
+ if !exists {
+ return map[string]int64{}
+ }
+ return counter.GetByGroup(mesh)
}
func (cm *counterManager) Bind(bus events.EventBus) error {
@@ -167,11 +186,14 @@ func (cm *counterManager) Bind(bus events.EventBus) error
{
if err := bus.Subscribe(subscriber); err != nil {
return err
}
+ logger.Infof("CounterManager subscribed to %s events",
resourceKind)
}
+ logger.Infof("CounterManager bound to EventBus successfully")
return nil
}
func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event
events.Event) error {
+ logger.Debugf("CounterManager handling %s event, type: %s", kind,
event.Type())
if counter := cm.simpleCounters[kind]; counter != nil {
processSimpleCounter(counter, event)
}
@@ -183,36 +205,72 @@ func (cm *counterManager) handleEvent(kind
resmodel.ResourceKind, event events.E
return nil
}
+func (cm *counterManager) getDistributionConfig(kind resmodel.ResourceKind,
metric CounterType) *distributionCounterConfig {
+ configs := cm.distributionConfigs[kind]
+ for _, cfg := range configs {
+ if cfg.counterType == metric {
+ return cfg
+ }
+ }
+ return nil
+}
+
+func extractMeshName(res resmodel.Resource) string {
+ if res == nil {
+ return "default"
+ }
+ mesh := res.ResourceMesh()
+ if mesh == "" {
+ return "default"
+ }
+ return mesh
+}
+
func processSimpleCounter(counter *Counter, event events.Event) {
+ mesh := extractMeshName(event.NewObj())
+ if event.NewObj() == nil {
+ mesh = extractMeshName(event.OldObj())
+ }
+
switch event.Type() {
case cache.Added:
- counter.Increment()
+ counter.Increment(mesh)
+ logger.Debugf("CounterManager: Increment %s for mesh=%s,
current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
case cache.Sync, cache.Replaced:
if isNewResourceEvent(event) {
- counter.Increment()
+ counter.Increment(mesh)
+ logger.Debugf("CounterManager: Increment %s for mesh=%s
(Sync/Replaced), current count=%d", counter.name, mesh,
counter.GetByGroup(mesh))
}
case cache.Deleted:
- counter.Decrement()
+ counter.Decrement(mesh)
+ logger.Debugf("CounterManager: Decrement %s for mesh=%s,
current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
case cache.Updated:
- // no-op for simple counters
default:
}
}
func processDistributionCounter(cfg *distributionCounterConfig, event
events.Event) {
+ mesh := extractMeshName(event.NewObj())
+ if event.NewObj() == nil {
+ mesh = extractMeshName(event.OldObj())
+ }
+
switch event.Type() {
case cache.Added:
- cfg.increment(cfg.extractFrom(event.NewObj()))
+ key := cfg.extractFrom(event.NewObj())
+ cfg.counter.Increment(mesh, normalizeDistributionKey(key))
case cache.Sync, cache.Replaced:
if isNewResourceEvent(event) {
- cfg.increment(cfg.extractFrom(event.NewObj()))
+ key := cfg.extractFrom(event.NewObj())
+ cfg.counter.Increment(mesh,
normalizeDistributionKey(key))
} else {
cfg.update(event.OldObj(), event.NewObj())
}
case cache.Updated:
cfg.update(event.OldObj(), event.NewObj())
case cache.Deleted:
- cfg.decrement(cfg.extractFrom(event.OldObj()))
+ key := cfg.extractFrom(event.OldObj())
+ cfg.counter.Decrement(mesh, normalizeDistributionKey(key))
default:
}
}
@@ -224,25 +282,19 @@ func (cfg *distributionCounterConfig) extractFrom(res
resmodel.Resource) string
return cfg.extractor(res)
}
-func (cfg *distributionCounterConfig) increment(key string) {
- cfg.counter.Increment(normalizeDistributionKey(key))
-}
-
-func (cfg *distributionCounterConfig) decrement(key string) {
- cfg.counter.Decrement(normalizeDistributionKey(key))
-}
-
func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource)
{
oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj))
newKey := normalizeDistributionKey(cfg.extractFrom(newObj))
- if oldKey == newKey {
+ oldMesh := extractMeshName(oldObj)
+ newMesh := extractMeshName(newObj)
+ if oldKey == newKey && oldMesh == newMesh {
return
}
if oldObj != nil {
- cfg.counter.Decrement(oldKey)
+ cfg.counter.Decrement(oldMesh, oldKey)
}
if newObj != nil {
- cfg.counter.Increment(newKey)
+ cfg.counter.Increment(newMesh, newKey)
}
}
diff --git a/pkg/console/handler/overview.go b/pkg/console/handler/overview.go
index d0ad7c01..1d9dc971 100644
--- a/pkg/console/handler/overview.go
+++ b/pkg/console/handler/overview.go
@@ -70,12 +70,23 @@ func ClusterOverview(ctx consolectx.Context)
gin.HandlerFunc {
return func(c *gin.Context) {
resp := model.NewOverviewResp()
if counterMgr := ctx.CounterManager(); counterMgr != nil {
- resp.AppCount =
counterMgr.Count(meshresource.ApplicationKind)
- resp.ServiceCount =
counterMgr.Count(meshresource.ServiceProviderMetadataKind)
- resp.InsCount =
counterMgr.Count(meshresource.InstanceKind)
- resp.Protocols =
counterMgr.Distribution(counter.ProtocolCounter)
- resp.Releases =
counterMgr.Distribution(counter.ReleaseCounter)
- resp.Discoveries =
counterMgr.Distribution(counter.DiscoveryCounter)
+ mesh := c.Query("mesh")
+
+ if mesh != "" {
+ resp.AppCount =
counterMgr.CountByMesh(meshresource.ApplicationKind, mesh)
+ resp.ServiceCount =
counterMgr.CountByMesh(meshresource.ServiceProviderMetadataKind, mesh)
+ resp.InsCount =
counterMgr.CountByMesh(meshresource.InstanceKind, mesh)
+ resp.Protocols =
counterMgr.DistributionByMesh(counter.ProtocolCounter, mesh)
+ resp.Releases =
counterMgr.DistributionByMesh(counter.ReleaseCounter, mesh)
+ resp.Discoveries =
counterMgr.DistributionByMesh(counter.DiscoveryCounter, mesh)
+ } else {
+ resp.AppCount =
counterMgr.Count(meshresource.ApplicationKind)
+ resp.ServiceCount =
counterMgr.Count(meshresource.ServiceProviderMetadataKind)
+ resp.InsCount =
counterMgr.Count(meshresource.InstanceKind)
+ resp.Protocols =
counterMgr.Distribution(counter.ProtocolCounter)
+ resp.Releases =
counterMgr.Distribution(counter.ReleaseCounter)
+ resp.Discoveries =
counterMgr.Distribution(counter.DiscoveryCounter)
+ }
}
c.JSON(http.StatusOK, model.NewSuccessResp(resp))
}
diff --git a/pkg/core/discovery/subscriber/instance.go
b/pkg/core/discovery/subscriber/instance.go
index 99714403..ded8a67d 100644
--- a/pkg/core/discovery/subscriber/instance.go
+++ b/pkg/core/discovery/subscriber/instance.go
@@ -71,6 +71,7 @@ func (s *InstanceEventSubscriber) ProcessEvent(event
events.Event) error {
instanceRes = oldObj
}
instanceResList, err := s.instanceStore.ListByIndexes(map[string]string{
+ index.ByMeshIndex: instanceRes.Mesh,
index.ByInstanceAppNameIndex: instanceRes.Spec.AppName,
})
diff --git a/pkg/core/discovery/subscriber/nacos_service.go
b/pkg/core/discovery/subscriber/nacos_service.go
index 6a83771d..e5c9c631 100644
--- a/pkg/core/discovery/subscriber/nacos_service.go
+++ b/pkg/core/discovery/subscriber/nacos_service.go
@@ -128,6 +128,7 @@ func (n *NacosServiceEventSubscriber)
processConsumerMetadataUpsert(serviceRes *
return err
}
resources, err := st.ListByIndexes(map[string]string{
+ index.ByMeshIndex: serviceRes.Mesh,
index.ByServiceConsumerServiceName: serviceName,
})
if err != nil {
@@ -212,6 +213,7 @@ func (n *NacosServiceEventSubscriber)
processRPCInstanceUpsert(serviceRes *meshr
return err
}
resources, err := st.ListByIndexes(map[string]string{
+ index.ByMeshIndex: serviceRes.Mesh,
index.ByRPCInstanceAppName: serviceRes.Name,
})
if err != nil {
@@ -286,6 +288,7 @@ func (n *NacosServiceEventSubscriber)
processServiceConsumerDelete(serviceRes *m
return err
}
resources, err := st.ListByIndexes(map[string]string{
+ index.ByMeshIndex: serviceRes.Mesh,
index.ByServiceConsumerServiceName: serviceRes.Name,
})
if err != nil {
@@ -309,6 +312,7 @@ func (n *NacosServiceEventSubscriber)
processRPCInstanceDelete(serviceRes *meshr
return err
}
resources, err := st.ListByIndexes(map[string]string{
+ index.ByMeshIndex: serviceRes.Mesh,
index.ByRPCInstanceAppName: serviceRes.Name,
})
if err != nil {