This is an automated email from the ASF dual-hosted git repository.
robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/develop by this push:
new 1e55d306 Implment Counter in local cache (#1345)
1e55d306 is described below
commit 1e55d30635381f76d9e94478082a74b351887283
Author: WyRainBow <[email protected]>
AuthorDate: Mon Nov 10 20:57:05 2025 +0800
Implment Counter in local cache (#1345)
---
pkg/console/context/context.go | 14 ++
pkg/console/counter/component.go | 73 ++++++++++
pkg/console/counter/counter.go | 104 +++++++++++++
pkg/console/counter/manager.go | 306 +++++++++++++++++++++++++++++++++++++++
pkg/console/handler/overview.go | 11 +-
pkg/console/model/overview.go | 12 +-
pkg/core/bootstrap/bootstrap.go | 15 +-
pkg/core/events/component.go | 6 +-
8 files changed, 530 insertions(+), 11 deletions(-)
diff --git a/pkg/console/context/context.go b/pkg/console/context/context.go
index f423ca09..9133c4f9 100644
--- a/pkg/console/context/context.go
+++ b/pkg/console/context/context.go
@@ -21,12 +21,14 @@ import (
ctx "context"
"github.com/apache/dubbo-admin/pkg/config/app"
+ "github.com/apache/dubbo-admin/pkg/console/counter"
"github.com/apache/dubbo-admin/pkg/core/manager"
"github.com/apache/dubbo-admin/pkg/core/runtime"
)
type Context interface {
ResourceManager() manager.ResourceManager
+ CounterManager() counter.CounterManager
Config() app.AdminConfig
@@ -57,3 +59,15 @@ func (c *context) ResourceManager() manager.ResourceManager {
rmc, _ := c.coreRt.GetComponent(runtime.ResourceManager)
return rmc.(manager.ResourceManagerComponent).ResourceManager()
}
+
+func (c *context) CounterManager() counter.CounterManager {
+ comp, err := c.coreRt.GetComponent(counter.ComponentType)
+ if err != nil {
+ return nil
+ }
+ managerComp, ok := comp.(counter.ManagerComponent)
+ if !ok {
+ return nil
+ }
+ return managerComp.CounterManager()
+}
diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go
new file mode 100644
index 00000000..c38d7fe8
--- /dev/null
+++ b/pkg/console/counter/component.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package counter
+
+import (
+ "fmt"
+ "math"
+
+ "github.com/apache/dubbo-admin/pkg/core/events"
+ "github.com/apache/dubbo-admin/pkg/core/runtime"
+)
+
+const ComponentType runtime.ComponentType = "counter manager"
+
+func init() {
+ runtime.RegisterComponent(&managerComponent{})
+}
+
+type ManagerComponent interface {
+ runtime.Component
+ CounterManager() CounterManager
+}
+
+var _ ManagerComponent = &managerComponent{}
+
+type managerComponent struct {
+ manager CounterManager
+}
+
+func (c *managerComponent) Type() runtime.ComponentType {
+ return ComponentType
+}
+
+func (c *managerComponent) Order() int {
+ return math.MaxInt - 1
+}
+
+func (c *managerComponent) Init(runtime.BuilderContext) error {
+ mgr := NewCounterManager()
+ c.manager = mgr
+ return nil
+}
+
+func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
+ component, err := rt.GetComponent(runtime.EventBus)
+ if err != nil {
+ return err
+ }
+ bus, ok := component.(events.EventBus)
+ if !ok {
+ return fmt.Errorf("component %s does not implement
events.EventBus", runtime.EventBus)
+ }
+ return c.manager.Bind(bus)
+}
+
+func (c *managerComponent) CounterManager() CounterManager {
+ return c.manager
+}
diff --git a/pkg/console/counter/counter.go b/pkg/console/counter/counter.go
new file mode 100644
index 00000000..c32d3516
--- /dev/null
+++ b/pkg/console/counter/counter.go
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package counter
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+type Counter struct {
+ name string
+ value atomic.Int64
+}
+
+func NewCounter(name string) *Counter {
+ return &Counter{name: name}
+}
+
+func (c *Counter) Get() int64 {
+ return c.value.Load()
+}
+
+func (c *Counter) Increment() {
+ c.value.Add(1)
+}
+
+func (c *Counter) Decrement() {
+ for {
+ current := c.value.Load()
+ if current == 0 {
+ return
+ }
+ if c.value.CompareAndSwap(current, current-1) {
+ return
+ }
+ }
+}
+
+func (c *Counter) Reset() {
+ c.value.Store(0)
+}
+
+type DistributionCounter struct {
+ name string
+ data map[string]int64
+ mu sync.RWMutex
+}
+
+func NewDistributionCounter(name string) *DistributionCounter {
+ return &DistributionCounter{
+ name: name,
+ data: make(map[string]int64),
+ }
+}
+
+func (c *DistributionCounter) Increment(key string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.data[key]++
+}
+
+func (c *DistributionCounter) Decrement(key string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if value, ok := c.data[key]; ok {
+ value--
+ if value <= 0 {
+ delete(c.data, key)
+ } else {
+ c.data[key] = value
+ }
+ }
+}
+
+func (c *DistributionCounter) GetAll() map[string]int64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ result := make(map[string]int64, len(c.data))
+ for k, v := range c.data {
+ result[k] = v
+ }
+ return result
+}
+
+func (c *DistributionCounter) Reset() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.data = make(map[string]int64)
+}
diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go
new file mode 100644
index 00000000..f9fb7ea3
--- /dev/null
+++ b/pkg/console/counter/manager.go
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package counter
+
+import (
+ "fmt"
+
+ "k8s.io/client-go/tools/cache"
+
+ "github.com/apache/dubbo-admin/pkg/core/events"
+ meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+ resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+)
+
+type CounterType string
+
+type FieldExtractor func(resmodel.Resource) string
+
+const (
+ ProtocolCounter CounterType = "protocol"
+ ReleaseCounter CounterType = "release"
+ DiscoveryCounter CounterType = "discovery"
+)
+
+type CounterManager interface {
+ RegisterSimpleCounter(kind resmodel.ResourceKind)
+ RegisterDistributionCounter(kind resmodel.ResourceKind, metric
CounterType, extractor FieldExtractor)
+ Count(kind resmodel.ResourceKind) int64
+ Distribution(metric CounterType) map[string]int64
+ Reset()
+ Bind(bus events.EventBus) error
+}
+
+type distributionCounterConfig struct {
+ counterType CounterType
+ counter *DistributionCounter
+ extractor func(resmodel.Resource) string
+}
+
+type counterManager struct {
+ simpleCounters map[resmodel.ResourceKind]*Counter
+ distributionConfigs
map[resmodel.ResourceKind][]*distributionCounterConfig
+ distributionByType map[CounterType]*DistributionCounter
+}
+
+func NewCounterManager() CounterManager {
+ return newCounterManager()
+}
+
+func newCounterManager() *counterManager {
+ cm := &counterManager{
+ simpleCounters: make(map[resmodel.ResourceKind]*Counter),
+ distributionConfigs:
make(map[resmodel.ResourceKind][]*distributionCounterConfig),
+ distributionByType: make(map[CounterType]*DistributionCounter),
+ }
+
+ cm.RegisterSimpleCounter(meshresource.ApplicationKind)
+ cm.RegisterSimpleCounter(meshresource.ServiceKind)
+ cm.RegisterSimpleCounter(meshresource.InstanceKind)
+
+ cm.RegisterDistributionCounter(meshresource.InstanceKind,
ProtocolCounter, instanceProtocolKey)
+ cm.RegisterDistributionCounter(meshresource.InstanceKind,
ReleaseCounter, instanceReleaseKey)
+ cm.RegisterDistributionCounter(meshresource.InstanceKind,
DiscoveryCounter, instanceMeshKey)
+
+ return cm
+}
+
+func (cm *counterManager) RegisterSimpleCounter(kind resmodel.ResourceKind) {
+ if kind == "" {
+ return
+ }
+ if _, exists := cm.simpleCounters[kind]; exists {
+ return
+ }
+ cm.simpleCounters[kind] = NewCounter(string(kind))
+}
+
+func (cm *counterManager) RegisterDistributionCounter(kind
resmodel.ResourceKind, metric CounterType, extractor FieldExtractor) {
+ if kind == "" || metric == "" {
+ return
+ }
+ counter := cm.distributionByType[metric]
+ if counter == nil {
+ counter = NewDistributionCounter(string(metric))
+ cm.distributionByType[metric] = counter
+ }
+
+ configs := cm.distributionConfigs[kind]
+ for _, cfg := range configs {
+ if cfg.counterType == metric {
+ cfg.counter = counter
+ cfg.extractor = extractor
+ return
+ }
+ }
+
+ cm.distributionConfigs[kind] = append(configs,
&distributionCounterConfig{
+ counterType: metric,
+ counter: counter,
+ extractor: extractor,
+ })
+}
+
+func (cm *counterManager) Reset() {
+ for _, counter := range cm.simpleCounters {
+ counter.Reset()
+ }
+ for _, counter := range cm.distributionByType {
+ counter.Reset()
+ }
+}
+
+func (cm *counterManager) Count(kind resmodel.ResourceKind) int64 {
+ if counter, exists := cm.simpleCounters[kind]; exists {
+ return counter.Get()
+ }
+ return 0
+}
+
+func (cm *counterManager) Distribution(metric CounterType) map[string]int64 {
+ counter, exists := cm.distributionByType[metric]
+ if !exists {
+ return map[string]int64{}
+ }
+ raw := counter.GetAll()
+ result := make(map[string]int64, len(raw))
+ for k, v := range raw {
+ result[k] = v
+ }
+ return result
+}
+
+func (cm *counterManager) Bind(bus events.EventBus) error {
+ handledKinds := make(map[resmodel.ResourceKind]struct{})
+ for kind := range cm.simpleCounters {
+ handledKinds[kind] = struct{}{}
+ }
+ for kind := range cm.distributionConfigs {
+ handledKinds[kind] = struct{}{}
+ }
+
+ for kind := range handledKinds {
+ resourceKind := kind
+ name := fmt.Sprintf("counter-manager/%s", resourceKind)
+ subscriber := &counterEventSubscriber{
+ kind: resourceKind,
+ name: name,
+ handler: func(event events.Event) error {
+ return cm.handleEvent(resourceKind, event)
+ },
+ }
+ if err := bus.Subscribe(subscriber); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event
events.Event) error {
+ if counter := cm.simpleCounters[kind]; counter != nil {
+ processSimpleCounter(counter, event)
+ }
+ if configs := cm.distributionConfigs[kind]; len(configs) > 0 {
+ for _, cfg := range configs {
+ processDistributionCounter(cfg, event)
+ }
+ }
+ return nil
+}
+
+func processSimpleCounter(counter *Counter, event events.Event) {
+ switch event.Type() {
+ case cache.Added:
+ counter.Increment()
+ case cache.Sync, cache.Replaced:
+ if isNewResourceEvent(event) {
+ counter.Increment()
+ }
+ case cache.Deleted:
+ counter.Decrement()
+ case cache.Updated:
+ // no-op for simple counters
+ default:
+ }
+}
+
+func processDistributionCounter(cfg *distributionCounterConfig, event
events.Event) {
+ switch event.Type() {
+ case cache.Added:
+ cfg.increment(cfg.extractFrom(event.NewObj()))
+ case cache.Sync, cache.Replaced:
+ if isNewResourceEvent(event) {
+ cfg.increment(cfg.extractFrom(event.NewObj()))
+ } else {
+ cfg.update(event.OldObj(), event.NewObj())
+ }
+ case cache.Updated:
+ cfg.update(event.OldObj(), event.NewObj())
+ case cache.Deleted:
+ cfg.decrement(cfg.extractFrom(event.OldObj()))
+ default:
+ }
+}
+
+func (cfg *distributionCounterConfig) extractFrom(res resmodel.Resource)
string {
+ if cfg.extractor == nil || res == nil {
+ return ""
+ }
+ return cfg.extractor(res)
+}
+
+func (cfg *distributionCounterConfig) increment(key string) {
+ cfg.counter.Increment(normalizeDistributionKey(key))
+}
+
+func (cfg *distributionCounterConfig) decrement(key string) {
+ cfg.counter.Decrement(normalizeDistributionKey(key))
+}
+
+func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource)
{
+ oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj))
+ newKey := normalizeDistributionKey(cfg.extractFrom(newObj))
+ if oldKey == newKey {
+ return
+ }
+ if oldObj != nil {
+ cfg.counter.Decrement(oldKey)
+ }
+ if newObj != nil {
+ cfg.counter.Increment(newKey)
+ }
+}
+
+func instanceProtocolKey(res resmodel.Resource) string {
+ instance, ok := res.(*meshresource.InstanceResource)
+ if !ok || instance == nil || instance.Spec == nil {
+ return ""
+ }
+ return instance.Spec.GetProtocol()
+}
+
+func instanceReleaseKey(res resmodel.Resource) string {
+ instance, ok := res.(*meshresource.InstanceResource)
+ if !ok || instance == nil || instance.Spec == nil {
+ return ""
+ }
+ return instance.Spec.GetReleaseVersion()
+}
+
+func instanceMeshKey(res resmodel.Resource) string {
+ instance, ok := res.(*meshresource.InstanceResource)
+ if !ok || instance == nil {
+ return ""
+ }
+ return instance.Mesh
+}
+
+func normalizeDistributionKey(key string) string {
+ if key == "" {
+ return "unknown"
+ }
+ return key
+}
+
+func isNewResourceEvent(event events.Event) bool {
+ if event == nil {
+ return false
+ }
+ return event.OldObj() == nil
+}
+
+type counterEventSubscriber struct {
+ kind resmodel.ResourceKind
+ name string
+ handler func(events.Event) error
+}
+
+func (s *counterEventSubscriber) ResourceKind() resmodel.ResourceKind {
+ return s.kind
+}
+
+func (s *counterEventSubscriber) Name() string {
+ return s.name
+}
+
+func (s *counterEventSubscriber) ProcessEvent(event events.Event) error {
+ if s.handler == nil {
+ return nil
+ }
+ return s.handler(event)
+}
diff --git a/pkg/console/handler/overview.go b/pkg/console/handler/overview.go
index ae1a1e6d..c2aac533 100644
--- a/pkg/console/handler/overview.go
+++ b/pkg/console/handler/overview.go
@@ -23,7 +23,9 @@ import (
"github.com/gin-gonic/gin"
consolectx "github.com/apache/dubbo-admin/pkg/console/context"
+ "github.com/apache/dubbo-admin/pkg/console/counter"
"github.com/apache/dubbo-admin/pkg/console/model"
+ meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
)
func AdminMetadata(ctx consolectx.Context) gin.HandlerFunc {
@@ -38,7 +40,14 @@ func AdminMetadata(ctx consolectx.Context) gin.HandlerFunc {
func ClusterOverview(ctx consolectx.Context) gin.HandlerFunc {
return func(c *gin.Context) {
resp := model.NewOverviewResp()
-
+ if counterMgr := ctx.CounterManager(); counterMgr != nil {
+ resp.AppCount =
counterMgr.Count(meshresource.ApplicationKind)
+ resp.ServiceCount =
counterMgr.Count(meshresource.ServiceKind)
+ resp.InsCount =
counterMgr.Count(meshresource.InstanceKind)
+ resp.Protocols =
counterMgr.Distribution(counter.ProtocolCounter)
+ resp.Releases =
counterMgr.Distribution(counter.ReleaseCounter)
+ resp.Discoveries =
counterMgr.Distribution(counter.DiscoveryCounter)
+ }
c.JSON(http.StatusOK, model.NewSuccessResp(resp))
}
}
diff --git a/pkg/console/model/overview.go b/pkg/console/model/overview.go
index 2a467a92..6ffbf0e0 100644
--- a/pkg/console/model/overview.go
+++ b/pkg/console/model/overview.go
@@ -18,12 +18,12 @@
package model
type OverviewResp struct {
- AppCount int `json:"appCount"`
- ServiceCount int `json:"serviceCount"`
- InsCount int `json:"insCount"`
- Protocols map[string]int `json:"protocols"`
- Releases map[string]int `json:"releases"`
- Discoveries map[string]int `json:"discoveries"`
+ AppCount int64 `json:"appCount"`
+ ServiceCount int64 `json:"serviceCount"`
+ InsCount int64 `json:"insCount"`
+ Protocols map[string]int64 `json:"protocols"`
+ Releases map[string]int64 `json:"releases"`
+ Discoveries map[string]int64 `json:"discoveries"`
}
func NewOverviewResp() *OverviewResp {
diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go
index 213c817f..7abcae7d 100644
--- a/pkg/core/bootstrap/bootstrap.go
+++ b/pkg/core/bootstrap/bootstrap.go
@@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/dubbo-admin/pkg/config/app"
+ "github.com/apache/dubbo-admin/pkg/console/counter"
"github.com/apache/dubbo-admin/pkg/core/logger"
"github.com/apache/dubbo-admin/pkg/core/runtime"
"github.com/apache/dubbo-admin/pkg/diagnostics"
@@ -57,7 +58,11 @@ func Bootstrap(appCtx context.Context, cfg app.AdminConfig)
(runtime.Runtime, er
if err := initializeConsole(builder); err != nil {
return nil, err
}
- // 6. initialize diagnotics
+ // 6. initialize counter manager
+ if err := initializeCounterManager(builder); err != nil {
+ return nil, err
+ }
+ // 7. initialize diagnostics
if err := initializeDiagnoticsServer(builder); err != nil {
logger.Errorf("got error when init diagnotics server %s", err)
}
@@ -123,6 +128,14 @@ func initializeDiagnoticsServer(builder *runtime.Builder)
error {
return initAndActivateComponent(builder, comp)
}
+func initializeCounterManager(builder *runtime.Builder) error {
+ comp, err := runtime.ComponentRegistry().Get(counter.ComponentType)
+ if err != nil {
+ return err
+ }
+ return initAndActivateComponent(builder, comp)
+}
+
func initAndActivateComponent(builder *runtime.Builder, comp
runtime.Component) error {
logger.Infof("initializing %s ...", comp.Type())
if err := comp.Init(builder); err != nil {
diff --git a/pkg/core/events/component.go b/pkg/core/events/component.go
index 6c1fff9a..2e6bfc0f 100644
--- a/pkg/core/events/component.go
+++ b/pkg/core/events/component.go
@@ -100,10 +100,10 @@ func (b *eventBus) Send(event Event) {
b.rwMutex.RLock()
defer b.rwMutex.RUnlock()
var rk model.ResourceKind
- if event.OldObj() != nil {
- rk = event.OldObj().ResourceKind()
- } else if event.NewObj() != nil {
+ if event.NewObj() != nil {
rk = event.NewObj().ResourceKind()
+ } else if event.OldObj() != nil {
+ rk = event.OldObj().ResourceKind()
}
subs, exists := b.subscriberDir[rk]
if !exists {