This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new f017d1a7 feature: Add xds metrics (#872)
f017d1a7 is described below

commit f017d1a70406f9dc18bed7138ef8d7acefcbdff5
Author: Joe Zhong <[email protected]>
AuthorDate: Thu Mar 5 01:12:10 2026 +0800

    feature: Add xds metrics (#872)
---
 README.md                                    |   2 +-
 dubbod/discovery/pkg/bootstrap/monitoring.go | 105 ++++++
 dubbod/discovery/pkg/bootstrap/server.go     |   9 +
 dubbod/discovery/pkg/xds/ads.go              |  21 ++
 dubbod/discovery/pkg/xds/discovery.go        |  27 ++
 dubbod/discovery/pkg/xds/monitoring.go       | 177 ++++++++++
 dubbod/discovery/pkg/xds/v3/model.go         |   4 +
 pkg/model/xds.go                             |  19 ++
 pkg/monitoring/monitoring.go                 | 468 +++++++++++++++++++++++++++
 pkg/version/version.go                       |   4 +
 pkg/xds/monitoring.go                        |  96 +++++-
 pkg/xds/server.go                            |  43 ++-
 12 files changed, 954 insertions(+), 21 deletions(-)

diff --git a/README.md b/README.md
index a4c8d053..c4a5a747 100644
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@
 <h2 align="center">Dubbo Service Mesh for Kubernetes</h2>
 
 Dubbo gRPC open source service mesh implemented for the underlying cluster 
management platform can directly receive policies from the control plane and 
obtain features such as load balancing, service discovery, and observability 
without requiring a sidecar proxy.
-- For more detailed information on how to use it, please visit 
[dsm-docs](https://dubbo-kubernetes.github.io/dsm-docs/)
+- For more detailed information on how to use it, please visit 
[dubbo-kubernetes.io](https://dubbo-kubernetes.github.io/)
 
 ## Introduction
 
diff --git a/dubbod/discovery/pkg/bootstrap/monitoring.go 
b/dubbod/discovery/pkg/bootstrap/monitoring.go
new file mode 100644
index 00000000..3330dbbf
--- /dev/null
+++ b/dubbod/discovery/pkg/bootstrap/monitoring.go
@@ -0,0 +1,105 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bootstrap
+
+import (
+       "fmt"
+       "net"
+       "net/http"
+       "time"
+
+       "github.com/apache/dubbo-kubernetes/pkg/log"
+       "github.com/apache/dubbo-kubernetes/pkg/monitoring"
+       "github.com/apache/dubbo-kubernetes/pkg/version"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+const (
+       metricsPath = "/metrics"
+       versionPath = "/version"
+)
+
+var (
+       serverStart = time.Now()
+
+       _ = monitoring.NewDerivedGauge(
+               "dubbod_uptime_seconds",
+               "Current dubbod server uptime in seconds",
+       ).ValueFrom(func() float64 {
+               return time.Since(serverStart).Seconds()
+       })
+
+       versionTag   = monitoring.CreateLabel("version")
+       dubbodVersion = monitoring.NewGauge(
+               "dubbod_info",
+               "Dubbod version and build information.",
+               monitoring.WithLabels("version"),
+       )
+)
+
+func addMonitor(mux *http.ServeMux) {
+       exporter := promhttp.HandlerFor(monitoring.GetRegistry(), 
promhttp.HandlerOpts{})
+       mux.Handle(metricsPath, exporter)
+
+       mux.HandleFunc(versionPath, func(out http.ResponseWriter, req 
*http.Request) {
+               if _, err := out.Write([]byte(version.Info.String())); err != 
nil {
+                       log.Errorf("Unable to write version string: %v", err)
+               }
+       })
+}
+
+func (s *Server) initMonitor(addr string) error {
+       s.addStartFunc("monitoring", func(stop <-chan struct{}) error {
+               if addr == "" {
+                       return nil
+               }
+
+               listener, err := net.Listen("tcp", addr)
+               if err != nil {
+                       return fmt.Errorf("unable to listen on socket: %v", err)
+               }
+
+               addMonitor(s.monitoringMux)
+
+               // Record version info
+               
dubbodVersion.With(versionTag.Value(version.Info.String())).Record(1)
+
+               monitoringServer := &http.Server{
+                       Addr:        listener.Addr().String(),
+                       Handler:     s.monitoringMux,
+                       IdleTimeout: 90 * time.Second,
+                       ReadTimeout: 30 * time.Second,
+               }
+
+               go func() {
+                       log.Infof("starting monitoring server at %s", 
listener.Addr())
+                       if err := monitoringServer.Serve(listener); err != nil 
&& err != http.ErrServerClosed {
+                               log.Errorf("error serving monitoring server: 
%v", err)
+                       }
+               }()
+
+               go func() {
+                       <-stop
+                       if err := monitoringServer.Close(); err != nil {
+                               log.Errorf("error closing monitoring server: 
%v", err)
+                       }
+               }()
+
+               return nil
+       })
+       return nil
+}
diff --git a/dubbod/discovery/pkg/bootstrap/server.go 
b/dubbod/discovery/pkg/bootstrap/server.go
index 59923e34..60d155f8 100644
--- a/dubbod/discovery/pkg/bootstrap/server.go
+++ b/dubbod/discovery/pkg/bootstrap/server.go
@@ -94,6 +94,9 @@ type Server struct {
        httpsAddr   string
        httpMux     *http.ServeMux
        httpsMux    *http.ServeMux // webhooks
+       
+       monitoringMux   *http.ServeMux
+       metricsExporter http.Handler
 
        ConfigStores     []model.ConfigStoreController
        configController model.ConfigStoreController
@@ -177,6 +180,7 @@ func NewServer(args *DubboArgs, initFuncs ...func(*Server)) 
(*Server, error) {
                server:                  server.New(),
                clusterID:               getClusterID(args),
                httpMux:                 http.NewServeMux(),
+               monitoringMux:           http.NewServeMux(),
                dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
                fileWatcher:             filewatcher.NewWatcher(),
                internalStop:            make(chan struct{}),
@@ -232,6 +236,11 @@ func NewServer(args *DubboArgs, initFuncs 
...func(*Server)) (*Server, error) {
        }
 
        InitGenerators(s.XDSServer, configGen)
+       
+       // Initialize monitoring server
+       if err := s.initMonitor(args.ServerOptions.HTTPAddr); err != nil {
+               return nil, fmt.Errorf("error initializing monitoring: %v", err)
+       }
 
        dubbodHost, _, err := e.GetDiscoveryAddress()
        if err != nil {
diff --git a/dubbod/discovery/pkg/xds/ads.go b/dubbod/discovery/pkg/xds/ads.go
index f7d605a5..5d1e3027 100644
--- a/dubbod/discovery/pkg/xds/ads.go
+++ b/dubbod/discovery/pkg/xds/ads.go
@@ -101,10 +101,19 @@ func (s *DiscoveryServer) AdsPushAll(req 
*model.PushRequest) {
                log.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d 
Version:%s",
                        totalService, connectedEndpoints, req.Push.PushVersion)
 
+               // Record services metric
+               monServices.Record(float64(totalService))
+
                if req.ConfigsUpdated == nil {
                        req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
                }
        }
+
+       // Record push triggers
+       if req.Reason != nil {
+               recordPushTriggers(req.Reason)
+       }
+
        s.StartPush(req)
 }
 
@@ -128,6 +137,11 @@ func (s *DiscoveryServer) initConnection(node *core.Node, 
con *Connection, ident
        s.addCon(con.ID(), con)
        currentCount := s.adsClientCount()
        log.Infof("new connection for node:%s (total connections: %d)", 
con.ID(), currentCount)
+       
+       // Record XDS client connection
+       version := "unknown"
+       recordXDSClients(version, 1)
+       
        defer con.MarkInitialized()
 
        if err := s.initializeProxy(con); err != nil {
@@ -150,6 +164,13 @@ func (s *DiscoveryServer) closeConnection(con *Connection) 
{
        if con.ID() == "" {
                return
        }
+       
+       // Record XDS client disconnection
+       if con.proxy != nil {
+               version := "unknown"
+               recordXDSClients(version, -1)
+       }
+       
        s.removeCon(con.ID())
 }
 
diff --git a/dubbod/discovery/pkg/xds/discovery.go 
b/dubbod/discovery/pkg/xds/discovery.go
index 2bd75db9..2c12f68a 100644
--- a/dubbod/discovery/pkg/xds/discovery.go
+++ b/dubbod/discovery/pkg/xds/discovery.go
@@ -120,11 +120,17 @@ func (s *DiscoveryServer) Shutdown() {
 }
 
 func (s *DiscoveryServer) initPushContext(req *model.PushRequest, 
oldPushContext *model.PushContext, version string) *model.PushContext {
+       startTime := time.Now()
+       
        push := model.NewPushContext()
        push.PushVersion = version
        push.InitContext(s.Env, oldPushContext, req)
        s.dropCacheForRequest(req)
        s.Env.SetPushContext(push)
+       
+       // Record push context init time
+       pushContextInitTime.Record(time.Since(startTime).Seconds())
+       
        return push
 }
 
@@ -172,6 +178,10 @@ func debounce(ch chan *model.PushRequest, stopCh <-chan 
struct{}, opts DebounceO
        push := func(req *model.PushRequest, debouncedEvents int, startDebounce 
time.Time) {
                pushFn(req)
                updateSent.Add(int64(debouncedEvents))
+               
+               // Record debounce time metric
+               debounceTime.Record(time.Since(startDebounce).Seconds())
+               
                freeCh <- struct{}{}
        }
 
@@ -280,6 +290,12 @@ func doSendPushes(stopCh <-chan struct{}, semaphore chan 
struct{}, queue *PushQu
                        if shuttingdown {
                                return
                        }
+                       
+                       // Record proxy queue time
+                       if push != nil && !push.Start.IsZero() {
+                               
proxiesQueueTime.Record(time.Since(push.Start).Seconds())
+                       }
+                       
                        doneFunc := func() {
                                queue.MarkDone(client)
                                <-semaphore
@@ -355,14 +371,21 @@ func (s *DiscoveryServer) ConfigUpdate(req 
*model.PushRequest) {
                s.Cache.ClearAll()
        }
        s.InboundUpdates.Inc()
+       
+       // Record inbound update metrics
+       if req.ConfigsUpdated != nil && len(req.ConfigsUpdated) > 0 {
+               recordInboundConfigUpdate()
+       }
 
        s.pushChannel <- req
 }
 
 func (s *DiscoveryServer) ServiceUpdate(shard model.ShardKey, hostname string, 
namespace string, event model.Event) {
        if event == model.EventDelete {
+               recordInboundServiceDelete()
                s.Env.EndpointIndex.DeleteServiceShard(shard, hostname, 
namespace, false)
        } else {
+               recordInboundServiceUpdate()
        }
 }
 
@@ -418,6 +441,10 @@ func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, 
serviceName string, na
        // 3. Endpoint health status changes
        if pushType == model.IncrementalPush || pushType == model.FullPush {
                log.Debugf("service %s/%s triggering %v push [endpoints=%d]", 
namespace, serviceName, pushType, len(dubboEndpoints))
+               
+               // Record EDS update metric
+               recordInboundEDSUpdate()
+               
                s.ConfigUpdate(&model.PushRequest{
                        // Full:           pushType == model.FullPush,
                        // ConfigsUpdated: sets.New(model.ConfigKey{Kind: 
kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
diff --git a/dubbod/discovery/pkg/xds/monitoring.go 
b/dubbod/discovery/pkg/xds/monitoring.go
new file mode 100644
index 00000000..732880b4
--- /dev/null
+++ b/dubbod/discovery/pkg/xds/monitoring.go
@@ -0,0 +1,177 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xds
+
+import (
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/dubbo-kubernetes/dubbod/discovery/pkg/model"
+       v3 "github.com/apache/dubbo-kubernetes/dubbod/discovery/pkg/xds/v3"
+       "github.com/apache/dubbo-kubernetes/pkg/monitoring"
+)
+
+var (
+       typeTag    = monitoring.CreateLabel("type")
+       versionTag = monitoring.CreateLabel("version")
+
+       monServices = monitoring.NewGauge(
+               "dubbod_services",
+               "Total services known to dubbod.",
+       )
+
+       xdsClients = monitoring.NewGauge(
+               "dubbod_xds",
+               "Number of endpoints connected to this dubbod using XDS.",
+               monitoring.WithLabels("version"),
+       )
+       xdsClientTrackerMutex = &sync.Mutex{}
+       xdsClientTracker      = make(map[string]float64)
+
+       pushes = monitoring.NewSum(
+               "dubbod_xds_pushes",
+               "Dubbod build and send errors for lds, rds, cds and eds.",
+               monitoring.WithLabels("type"),
+       )
+
+       cdsSendErrPushes = pushes.With(typeTag.Value("cds_senderr"))
+       edsSendErrPushes = pushes.With(typeTag.Value("eds_senderr"))
+       ldsSendErrPushes = pushes.With(typeTag.Value("lds_senderr"))
+       rdsSendErrPushes = pushes.With(typeTag.Value("rds_senderr"))
+
+       debounceTime = monitoring.NewDistribution(
+               "dubbod_debounce_time",
+               "Delay in seconds between the first config enters debouncing 
and the merged push request is pushed into the push queue.",
+               []float64{.01, .1, 1, 3, 5, 10, 20, 30},
+       )
+
+       pushContextInitTime = monitoring.NewDistribution(
+               "dubbod_pushcontext_init_seconds",
+               "Total time in seconds Dubbod takes to init pushContext.",
+               []float64{.01, .1, 0.5, 1, 3, 5},
+       )
+
+       pushTime = monitoring.NewDistribution(
+               "dubbod_xds_push_time",
+               "Total time in seconds Dubbod takes to push lds, rds, cds and 
eds.",
+               []float64{.01, .1, 1, 3, 5, 10, 20, 30},
+               monitoring.WithLabels("type"),
+       )
+
+       proxiesQueueTime = monitoring.NewDistribution(
+               "dubbod_proxy_queue_time",
+               "Time in seconds, a proxy is in the push queue before being 
dequeued.",
+               []float64{.1, .5, 1, 3, 5, 10, 20, 30},
+       )
+
+       pushTriggers = monitoring.NewSum(
+               "dubbod_push_triggers",
+               "Total number of times a push was triggered, labeled by reason 
for the push.",
+               monitoring.WithLabels("type"),
+       )
+
+       proxiesConvergeDelay = monitoring.NewDistribution(
+               "dubbod_proxy_convergence_time",
+               "Delay in seconds between config change and a proxy receiving 
all required configuration.",
+               []float64{.1, .5, 1, 3, 5, 10, 20, 30},
+       )
+
+       inboundUpdates = monitoring.NewSum(
+               "dubbod_inbound_updates",
+               "Total number of updates received by dubbod.",
+               monitoring.WithLabels("type"),
+       )
+
+       inboundConfigUpdates  = inboundUpdates.With(typeTag.Value("config"))
+       inboundEDSUpdates     = inboundUpdates.With(typeTag.Value("eds"))
+       inboundServiceUpdates = inboundUpdates.With(typeTag.Value("svc"))
+       inboundServiceDeletes = inboundUpdates.With(typeTag.Value("svcdelete"))
+
+       dubbodSDSCertificateErrors = monitoring.NewSum(
+               "dubbod_sds_certificate_errors_total",
+               "Total number of failures to fetch SDS key and certificate.",
+       )
+
+       configSizeBytes = monitoring.NewDistribution(
+               "dubbod_xds_config_size_bytes",
+               "Distribution of configuration sizes pushed to clients",
+               []float64{1, 10000, 1000000, 4000000, 10000000, 40000000},
+               monitoring.WithUnit(monitoring.Bytes),
+               monitoring.WithLabels("type"),
+       )
+)
+
+func recordXDSClients(version string, delta float64) {
+       xdsClientTrackerMutex.Lock()
+       defer xdsClientTrackerMutex.Unlock()
+       xdsClientTracker[version] += delta
+       
xdsClients.With(versionTag.Value(version)).Record(xdsClientTracker[version])
+}
+
+func recordPushTriggers(reasons model.ReasonStats) {
+       for r, cnt := range reasons {
+               
pushTriggers.With(typeTag.Value(string(r))).RecordInt(int64(cnt))
+       }
+}
+
+func isUnexpectedError(err error) bool {
+       s, ok := status.FromError(err)
+       isError := s.Code() != codes.Unavailable && s.Code() != codes.Canceled
+       return !ok || isError
+}
+
+func recordSendError(xdsType string, err error) bool {
+       if isUnexpectedError(err) {
+               switch xdsType {
+               case v3.ListenerType:
+                       ldsSendErrPushes.Increment()
+               case v3.ClusterType:
+                       cdsSendErrPushes.Increment()
+               case v3.EndpointType:
+                       edsSendErrPushes.Increment()
+               case v3.RouteType:
+                       rdsSendErrPushes.Increment()
+               }
+               return true
+       }
+       return false
+}
+
+func recordPushTime(xdsType string, duration time.Duration) {
+       metricType := v3.GetMetricType(xdsType)
+       pushTime.With(typeTag.Value(metricType)).Record(duration.Seconds())
+       pushes.With(typeTag.Value(metricType)).Increment()
+}
+
+func recordInboundConfigUpdate() {
+       inboundConfigUpdates.Increment()
+}
+
+func recordInboundEDSUpdate() {
+       inboundEDSUpdates.Increment()
+}
+
+func recordInboundServiceUpdate() {
+       inboundServiceUpdates.Increment()
+}
+
+func recordInboundServiceDelete() {
+       inboundServiceDeletes.Increment()
+}
diff --git a/dubbod/discovery/pkg/xds/v3/model.go 
b/dubbod/discovery/pkg/xds/v3/model.go
index 78664a91..e0ad5748 100644
--- a/dubbod/discovery/pkg/xds/v3/model.go
+++ b/dubbod/discovery/pkg/xds/v3/model.go
@@ -30,3 +30,7 @@ const (
 func GetShortType(typeURL string) string {
        return model.GetShortType(typeURL)
 }
+
+func GetMetricType(typeURL string) string {
+       return model.GetMetricType(typeURL)
+}
diff --git a/pkg/model/xds.go b/pkg/model/xds.go
index c66d02a0..453a24b3 100644
--- a/pkg/model/xds.go
+++ b/pkg/model/xds.go
@@ -46,3 +46,22 @@ func GetShortType(typeURL string) string {
                return typeURL
        }
 }
+
+func GetMetricType(typeURL string) string {
+       switch typeURL {
+       case ClusterType:
+               return "cds"
+       case ListenerType:
+               return "lds"
+       case RouteType:
+               return "rds"
+       case EndpointType:
+               return "eds"
+       case SecretType:
+               return "sds"
+       case ProxyConfigType:
+               return "pcds"
+       default:
+               return typeURL
+       }
+}
diff --git a/pkg/monitoring/monitoring.go b/pkg/monitoring/monitoring.go
new file mode 100644
index 00000000..82b5e0e9
--- /dev/null
+++ b/pkg/monitoring/monitoring.go
@@ -0,0 +1,468 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package monitoring
+
+import (
+       "context"
+       "sync"
+
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+// Unit encodes the standard name for describing the quantity
+// measured by a Metric (if applicable).
+type Unit string
+
+// Predefined units for use with the monitoring package.
+const (
+       None         Unit = "1"
+       Bytes        Unit = "By"
+       Seconds      Unit = "s"
+       Milliseconds Unit = "ms"
+)
+
+// A Metric collects numerical observations.
+type Metric interface {
+       // Increment records a value of 1 for the current measure.
+       Increment()
+
+       // Decrement records a value of -1 for the current measure.
+       Decrement()
+
+       // Name returns the name value of a Metric.
+       Name() string
+
+       // Record makes an observation of the provided value for the given 
measure.
+       Record(value float64)
+
+       // RecordInt makes an observation of the provided value for the measure.
+       RecordInt(value int64)
+
+       // With creates a new Metric, with the LabelValues provided.
+       With(labelValues ...LabelValue) Metric
+}
+
+// A Label provides a named dimension for a Metric.
+type Label struct {
+       Name string
+}
+
+// A LabelValue represents a Label with a specific value.
+type LabelValue struct {
+       Name  string
+       Value string
+}
+
+// Value creates a LabelValue for this Label.
+func (l Label) Value(value string) LabelValue {
+       return LabelValue{Name: l.Name, Value: value}
+}
+
+// CreateLabel creates a new Label.
+func CreateLabel(name string) Label {
+       return Label{Name: name}
+}
+
+// Options encode changes to the options passed to a Metric at creation time.
+type Options func(*options)
+
+type options struct {
+       unit   Unit
+       labels []string
+}
+
+// WithUnit provides configuration options for a new Metric.
+func WithUnit(unit Unit) Options {
+       return func(opts *options) {
+               opts.unit = unit
+       }
+}
+
+// WithLabels provides configuration options for a new Metric with predefined 
label names.
+func WithLabels(labels ...string) Options {
+       return func(opts *options) {
+               opts.labels = labels
+       }
+}
+
+// NewSum creates a new Metric with an aggregation type of Sum.
+func NewSum(name, description string, opts ...Options) Metric {
+       return newMetric(name, description, prometheus.CounterValue, opts...)
+}
+
+// NewGauge creates a new Metric with an aggregation type of Gauge.
+func NewGauge(name, description string, opts ...Options) Metric {
+       return newMetric(name, description, prometheus.GaugeValue, opts...)
+}
+
+// NewDistribution creates a new Metric with an aggregation type of 
Distribution.
+func NewDistribution(name, description string, bounds []float64, opts 
...Options) Metric {
+       return newDistribution(name, description, bounds, opts...)
+}
+
+// DerivedGauge is a Metric that is computed from a function.
+type DerivedGauge interface {
+       // Name returns the name value of a DerivedGauge.
+       Name() string
+
+       // ValueFrom sets the function to be called to get the value.
+       ValueFrom(valueFn func() float64) DerivedGauge
+}
+
+// NewDerivedGauge creates a new DerivedGauge.
+func NewDerivedGauge(name, description string, opts ...Options) DerivedGauge {
+       return newDerivedGauge(name, description, opts...)
+}
+
+var (
+       registry     = prometheus.NewRegistry()
+       registryLock sync.RWMutex
+)
+
+type metric struct {
+       name        string
+       description string
+       valueType   prometheus.ValueType
+       labelNames  []string
+       vec         *prometheus.GaugeVec
+       vecOnce     sync.Once
+       counter     prometheus.Counter
+       gauge       prometheus.Gauge
+}
+
+func newMetric(name, description string, valueType prometheus.ValueType, opts 
...Options) Metric {
+       o := &options{}
+       for _, opt := range opts {
+               opt(o)
+       }
+
+       m := &metric{
+               name:        name,
+               description: description,
+               valueType:   valueType,
+               labelNames:  o.labels,
+       }
+
+       // If labels are predefined, create the vector immediately
+       if len(o.labels) > 0 {
+               if valueType == prometheus.CounterValue {
+                       m.vec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                               Name: name,
+                               Help: description,
+                       }, o.labels)
+               } else {
+                       m.vec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                               Name: name,
+                               Help: description,
+                       }, o.labels)
+               }
+               registryLock.Lock()
+               registry.MustRegister(m.vec)
+               registryLock.Unlock()
+       } else {
+               // No predefined labels, create scalar metric
+               if valueType == prometheus.CounterValue {
+                       m.counter = 
prometheus.NewCounter(prometheus.CounterOpts{
+                               Name: name,
+                               Help: description,
+                       })
+                       registryLock.Lock()
+                       registry.MustRegister(m.counter)
+                       registryLock.Unlock()
+               } else {
+                       m.gauge = prometheus.NewGauge(prometheus.GaugeOpts{
+                               Name: name,
+                               Help: description,
+                       })
+                       registryLock.Lock()
+                       registry.MustRegister(m.gauge)
+                       registryLock.Unlock()
+               }
+       }
+
+       return m
+}
+
+func (m *metric) Increment() {
+       m.Record(1)
+}
+
+func (m *metric) Decrement() {
+       m.Record(-1)
+}
+
+func (m *metric) Name() string {
+       return m.name
+}
+
+func (m *metric) Record(value float64) {
+       if m.counter != nil {
+               m.counter.Add(value)
+       } else if m.gauge != nil {
+               if value >= 0 {
+                       m.gauge.Set(value)
+               } else {
+                       m.gauge.Add(value)
+               }
+       }
+}
+
+func (m *metric) RecordInt(value int64) {
+       m.Record(float64(value))
+}
+
+func (m *metric) With(labelValues ...LabelValue) Metric {
+       // If vector already exists (predefined labels), use it directly
+       if m.vec != nil {
+               labels := make(prometheus.Labels)
+               for _, lv := range labelValues {
+                       labels[lv.Name] = lv.Value
+               }
+               return &labeledMetric{
+                       parent: m,
+                       gauge:  m.vec.With(labels),
+                       labels: labels,
+               }
+       }
+
+       // Otherwise, create vector on first call (legacy behavior)
+       m.vecOnce.Do(func() {
+               labelNames := make([]string, len(labelValues))
+               for i, lv := range labelValues {
+                       labelNames[i] = lv.Name
+               }
+
+               if m.valueType == prometheus.CounterValue {
+                       m.vec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                               Name: m.name,
+                               Help: m.description,
+                       }, labelNames)
+               } else {
+                       m.vec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                               Name: m.name,
+                               Help: m.description,
+                       }, labelNames)
+               }
+
+               registryLock.Lock()
+               registry.MustRegister(m.vec)
+               registryLock.Unlock()
+       })
+
+       labels := make(prometheus.Labels)
+       for _, lv := range labelValues {
+               labels[lv.Name] = lv.Value
+       }
+
+       return &labeledMetric{
+               parent: m,
+               gauge:  m.vec.With(labels),
+               labels: labels,
+       }
+}
+
+type labeledMetric struct {
+       parent *metric
+       gauge  prometheus.Gauge
+       labels prometheus.Labels
+}
+
+func (lm *labeledMetric) Increment() {
+       lm.Record(1)
+}
+
+func (lm *labeledMetric) Decrement() {
+       lm.Record(-1)
+}
+
+func (lm *labeledMetric) Name() string {
+       return lm.parent.name
+}
+
+func (lm *labeledMetric) Record(value float64) {
+       if lm.parent.valueType == prometheus.CounterValue {
+               lm.gauge.Add(value)
+       } else {
+               if value >= 0 {
+                       lm.gauge.Set(value)
+               } else {
+                       lm.gauge.Add(value)
+               }
+       }
+}
+
+func (lm *labeledMetric) RecordInt(value int64) {
+       lm.Record(float64(value))
+}
+
+func (lm *labeledMetric) With(labelValues ...LabelValue) Metric {
+       newLabels := make(prometheus.Labels)
+       for k, v := range lm.labels {
+               newLabels[k] = v
+       }
+       for _, lv := range labelValues {
+               newLabels[lv.Name] = lv.Value
+       }
+
+       return &labeledMetric{
+               parent: lm.parent,
+               gauge:  lm.parent.vec.With(newLabels),
+               labels: newLabels,
+       }
+}
+
+type distribution struct {
+       name        string
+       description string
+       histogram   *prometheus.HistogramVec
+       histOnce    sync.Once
+       bounds      []float64
+}
+
+func newDistribution(name, description string, bounds []float64, opts 
...Options) Metric {
+       d := &distribution{
+               name:        name,
+               description: description,
+               bounds:      bounds,
+       }
+
+       d.histOnce.Do(func() {
+               d.histogram = 
prometheus.NewHistogramVec(prometheus.HistogramOpts{
+                       Name:    name,
+                       Help:    description,
+                       Buckets: bounds,
+               }, []string{})
+
+               registryLock.Lock()
+               registry.MustRegister(d.histogram)
+               registryLock.Unlock()
+       })
+
+       return &distributionMetric{
+               parent:    d,
+               histogram: d.histogram.WithLabelValues(),
+       }
+}
+
+type distributionMetric struct {
+       parent    *distribution
+       histogram prometheus.Observer
+       labels    prometheus.Labels
+}
+
+func (dm *distributionMetric) Increment() {
+       dm.Record(1)
+}
+
+func (dm *distributionMetric) Decrement() {
+       dm.Record(-1)
+}
+
+func (dm *distributionMetric) Name() string {
+       return dm.parent.name
+}
+
+func (dm *distributionMetric) Record(value float64) {
+       dm.histogram.Observe(value)
+}
+
+func (dm *distributionMetric) RecordInt(value int64) {
+       dm.Record(float64(value))
+}
+
+func (dm *distributionMetric) With(labelValues ...LabelValue) Metric {
+       if len(labelValues) == 0 {
+               return dm
+       }
+
+       // Create new histogram vec with labels if needed
+       labelNames := make([]string, len(labelValues))
+       labels := make(prometheus.Labels)
+       for i, lv := range labelValues {
+               labelNames[i] = lv.Name
+               labels[lv.Name] = lv.Value
+       }
+
+       // Check if we need to recreate the histogram with labels
+       if dm.labels == nil {
+               newHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{
+                       Name:    dm.parent.name,
+                       Help:    dm.parent.description,
+                       Buckets: dm.parent.bounds,
+               }, labelNames)
+
+               registryLock.Lock()
+               registry.MustRegister(newHist)
+               registryLock.Unlock()
+
+               dm.parent.histogram = newHist
+       }
+
+       return &distributionMetric{
+               parent:    dm.parent,
+               histogram: dm.parent.histogram.With(labels),
+               labels:    labels,
+       }
+}
+
+type derivedGauge struct {
+       name        string
+       description string
+       valueFn     func() float64
+       collector   prometheus.Collector
+}
+
+func newDerivedGauge(name, description string, opts ...Options) DerivedGauge {
+       dg := &derivedGauge{
+               name:        name,
+               description: description,
+       }
+       return dg
+}
+
+func (dg *derivedGauge) Name() string {
+       return dg.name
+}
+
+func (dg *derivedGauge) ValueFrom(valueFn func() float64) DerivedGauge {
+       dg.valueFn = valueFn
+
+       // Create a custom collector
+       dg.collector = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+               Name: dg.name,
+               Help: dg.description,
+       }, valueFn)
+
+       registryLock.Lock()
+       registry.MustRegister(dg.collector)
+       registryLock.Unlock()
+
+       return dg
+}
+
+// RegisterPrometheusExporter registers the Prometheus exporter.
+func RegisterPrometheusExporter(ctx context.Context) error {
+       // Already registered in the package-level registry
+       return nil
+}
+
+// GetRegistry returns the Prometheus registry.
+func GetRegistry() *prometheus.Registry {
+       registryLock.RLock()
+       defer registryLock.RUnlock()
+       return registry
+}
diff --git a/pkg/version/version.go b/pkg/version/version.go
index 6f8c60ac..f751e2e8 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -81,6 +81,10 @@ func (b BuildInfo) FormatDetailedProductInfo() string {
        )
 }
 
+func (b BuildInfo) String() string {
+       return fmt.Sprintf("%s-%s-%s", b.Version, b.GitTag, 
shortCommit(b.GitCommit))
+}
+
 func (b BuildInfo) AsMap() map[string]string {
        res := map[string]string{
                "product":    b.Product,
diff --git a/pkg/xds/monitoring.go b/pkg/xds/monitoring.go
index 9ef9d4cb..8374ff84 100644
--- a/pkg/xds/monitoring.go
+++ b/pkg/xds/monitoring.go
@@ -16,9 +16,99 @@
 
 package xds
 
-import dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
+import (
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/dubbo-kubernetes/pkg/log"
+       "github.com/apache/dubbo-kubernetes/pkg/model"
+       "github.com/apache/dubbo-kubernetes/pkg/monitoring"
+)
 
 var (
-       Log = dubbolog.RegisterScope("ads", "ads debugging")
-       log = Log
+       Log = log.RegisterScope("ads", "ads debugging")
+       xdsLog = Log
+
+       errTag  = monitoring.CreateLabel("err")
+       nodeTag = monitoring.CreateLabel("node")
+       typeTag = monitoring.CreateLabel("type")
+
+       TotalXDSInternalErrors = monitoring.NewSum(
+               "dubbod_total_xds_internal_errors",
+               "Total number of internal XDS errors in dubbod.",
+       )
+
+       ExpiredNonce = monitoring.NewSum(
+               "dubbod_xds_expired_nonce",
+               "Total number of XDS requests with an expired nonce.",
+       )
+
+       cdsReject = monitoring.NewGauge(
+               "dubbod_xds_cds_reject",
+               "Dubbod rejected CDS configs.",
+       )
+
+       edsReject = monitoring.NewGauge(
+               "dubbod_xds_eds_reject",
+               "Dubbod rejected EDS.",
+       )
+
+       ldsReject = monitoring.NewGauge(
+               "dubbod_xds_lds_reject",
+               "Dubbod rejected LDS.",
+       )
+
+       rdsReject = monitoring.NewGauge(
+               "dubbod_xds_rds_reject",
+               "Dubbod rejected RDS.",
+       )
+
+       totalXDSRejects = monitoring.NewSum(
+               "dubbod_total_xds_rejects",
+               "Total number of XDS responses from dubbod rejected by proxy.",
+       )
+
+       ResponseWriteTimeouts = monitoring.NewSum(
+               "dubbod_xds_write_timeout",
+               "Dubbod XDS response write timeouts.",
+       )
+
+       sendTime = monitoring.NewDistribution(
+               "dubbod_xds_send_time",
+               "Total time in seconds Dubbod takes to send generated 
configuration.",
+               []float64{.01, .1, 1, 3, 5, 10, 20, 30},
+       )
 )
+
+func IncrementXDSRejects(xdsType string, node, errCode string) {
+       metricType := model.GetShortType(xdsType)
+       totalXDSRejects.With(typeTag.Value(metricType)).Increment()
+       switch xdsType {
+       case model.ListenerType:
+               ldsReject.With(nodeTag.Value(node), 
errTag.Value(errCode)).Increment()
+       case model.ClusterType:
+               cdsReject.With(nodeTag.Value(node), 
errTag.Value(errCode)).Increment()
+       case model.EndpointType:
+               edsReject.With(nodeTag.Value(node), 
errTag.Value(errCode)).Increment()
+       case model.RouteType:
+               rdsReject.With(nodeTag.Value(node), 
errTag.Value(errCode)).Increment()
+       }
+}
+
+func RecordSendTime(duration time.Duration) {
+       sendTime.Record(duration.Seconds())
+}
+
+func RecordSendError(xdsType string, err error) {
+       if isUnexpectedError(err) {
+               TotalXDSInternalErrors.Increment()
+       }
+}
+
+func isUnexpectedError(err error) bool {
+       s, ok := status.FromError(err)
+       isError := s.Code() != codes.Unavailable && s.Code() != codes.Canceled
+       return !ok || isError
+}
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index e5dd1015..14ec47ae 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -166,16 +166,16 @@ func Receive(ctx ConnectionContext) {
                req, err := con.stream.Recv()
                if err != nil {
                        if dubbogrpc.GRPCErrorType(err) != 
dubbogrpc.UnexpectedError {
-                               log.Infof("%q %s terminated", con.peerAddr, 
con.conID)
+                               xdsLog.Infof("%q %s terminated", con.peerAddr, 
con.conID)
                                return
                        }
                        con.errorChan <- err
-                       log.Errorf("%q %s terminated with error: %v", 
con.peerAddr, con.conID, err)
+                       xdsLog.Errorf("%q %s terminated with error: %v", 
con.peerAddr, con.conID, err)
                        return
                }
                if firstRequest {
                        if req.TypeUrl == model.HealthInfoType {
-                               log.Warnf("%q %s send health check probe before 
normal xDS request", con.peerAddr, con.conID)
+                               xdsLog.Warnf("%q %s send health check probe 
before normal xDS request", con.peerAddr, con.conID)
                                continue
                        }
                        firstRequest = false
@@ -195,13 +195,13 @@ func Receive(ctx ConnectionContext) {
                if len(req.ResourceNames) > 0 {
                        resourceNamesStr = " [" + 
strings.Join(req.ResourceNames, ", ") + "]"
                }
-               log.Infof("%s: RAW REQ %s resources:%d nonce:%s%s",
+               xdsLog.Infof("%s: RAW REQ %s resources:%d nonce:%s%s",
                        model.GetShortType(req.TypeUrl), con.conID, 
len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
 
                select {
                case con.reqChan <- req:
                case <-con.stream.Context().Done():
-                       log.Infof("%q %s terminated with stream closed", 
con.peerAddr, con.conID)
+                       xdsLog.Infof("%q %s terminated with stream closed", 
con.peerAddr, con.conID)
                        return
                }
        }
@@ -209,8 +209,14 @@ func Receive(ctx ConnectionContext) {
 
 func Send(ctx ConnectionContext, res *discovery.DiscoveryResponse) error {
        conn := ctx.XdsConnection()
+       startTime := time.Now()
        err := conn.stream.Send(res)
+       sendDuration := time.Since(startTime)
+       
        if err == nil {
+               // Record send time metric
+               RecordSendTime(sendDuration)
+               
                if res.Nonce != "" {
                        ctx.Watcher().UpdateWatchedResource(res.TypeUrl, 
func(wr *WatchedResource) *WatchedResource {
                                if wr == nil {
@@ -221,6 +227,9 @@ func Send(ctx ConnectionContext, res 
*discovery.DiscoveryResponse) error {
                                return wr
                        })
                }
+       } else {
+               // Record send error
+               RecordSendError(res.TypeUrl, err)
        }
        return err
 }
@@ -266,7 +275,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
 
        if request.ErrorDetail != nil {
                errCode := codes.Code(request.ErrorDetail.Code)
-               log.Warnf("%s: ACK ERROR %s %s:%s", stype, id, 
errCode.String(), request.ErrorDetail.GetMessage())
+               Log.Warnf("%s: ACK ERROR %s %s:%s", stype, id, 
errCode.String(), request.ErrorDetail.GetMessage())
                w.UpdateWatchedResource(request.TypeUrl, func(wr 
*WatchedResource) *WatchedResource {
                        wr.LastError = request.ErrorDetail.GetMessage()
                        return wr
@@ -275,7 +284,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
        }
 
        if shouldUnsubscribe(request) {
-               log.Debugf("%s: UNSUBSCRIBE %s %s %s", stype, id, 
request.VersionInfo, request.ResponseNonce)
+               Log.Debugf("%s: UNSUBSCRIBE %s %s %s", stype, id, 
request.VersionInfo, request.ResponseNonce)
                w.DeleteWatchedResource(request.TypeUrl)
                return false, emptyResourceDelta
        }
@@ -290,7 +299,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
                // No previous info - this is a new request
                if request.ResponseNonce == "" {
                        // Initial request with no nonce
-                       log.Debugf("%s: INIT %s %s", stype, id, 
request.VersionInfo)
+                       Log.Debugf("%s: INIT %s %s", stype, id, 
request.VersionInfo)
                        w.NewWatchedResource(request.TypeUrl, 
request.ResourceNames)
                        return true, emptyResourceDelta
                }
@@ -299,7 +308,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
                // they're likely stale/expired requests that should be ignored
                // Only treat as reconnect if this is the first request we see 
with a nonce
                // For subsequent requests with nonces we don't recognize, 
treat as expired
-               log.Debugf("%s: REQ %s Unknown nonce (no previous info): %s, 
treating as expired/stale", stype, id, request.ResponseNonce)
+               Log.Debugf("%s: REQ %s Unknown nonce (no previous info): %s, 
treating as expired/stale", stype, id, request.ResponseNonce)
                // Create the watched resource but don't respond - let the 
client retry with empty nonce
                w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
                return false, emptyResourceDelta
@@ -308,7 +317,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
        // We have previous info - check nonce match
        if request.ResponseNonce == "" {
                // Client sent empty nonce but we have previous info - this is 
a new request
-               log.Debugf("%s: INIT (empty nonce) %s %s", stype, id, 
request.VersionInfo)
+               Log.Debugf("%s: INIT (empty nonce) %s %s", stype, id, 
request.VersionInfo)
                w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
                return true, emptyResourceDelta
        }
@@ -323,7 +332,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
                // as a resource change rather than an ACK so the new clusters 
get a response.
                previousResourcesCopy := previousInfo.ResourceNames.Copy()
                if !newResources.Equals(previousResourcesCopy) && 
len(newResources) > 0 {
-                       log.Infof("%s: REQ %s nonce mismatch (got %s, sent %s) 
but resources changed -> responding",
+                       Log.Infof("%s: REQ %s nonce mismatch (got %s, sent %s) 
but resources changed -> responding",
                                stype, id, request.ResponseNonce, 
previousInfo.NonceSent)
                        added := newResources.Difference(previousResourcesCopy)
                        w.UpdateWatchedResource(request.TypeUrl, func(wr 
*WatchedResource) *WatchedResource {
@@ -346,11 +355,11 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
                // Expired/stale nonce - don't respond, just log at debug level
                if previousInfo.NonceSent == "" {
                        // We never sent a nonce, but client sent one - this is 
unusual but treat as expired
-                       log.Debugf("%s: REQ %s Expired nonce received %s, but 
we never sent any nonce", stype,
+                       Log.Debugf("%s: REQ %s Expired nonce received %s, but 
we never sent any nonce", stype,
                                id, request.ResponseNonce)
                } else {
                        // Normal case: client sent stale nonce
-                       log.Debugf("%s: REQ %s Expired nonce received %s, sent 
%s", stype,
+                       Log.Debugf("%s: REQ %s Expired nonce received %s, sent 
%s", stype,
                                id, request.ResponseNonce, 
previousInfo.NonceSent)
                }
                return false, emptyResourceDelta
@@ -387,7 +396,7 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
                // Check if this is proxyless by attempting to get the proxy 
from watcher
                // For now, we'll check if this is a proxyless scenario by the 
context
                // If previous resources existed and client sends empty names 
with matching nonce, it's an ACK
-               log.Debugf("%s: wildcard request after specific resources 
(prev: %d resources, nonce: %s), treating as ACK",
+               Log.Debugf("%s: wildcard request after specific resources 
(prev: %d resources, nonce: %s), treating as ACK",
                        stype, len(previousResources), previousInfo.NonceSent)
                // Update ResourceNames to keep previous resources (don't clear 
them)
                w.UpdateWatchedResource(request.TypeUrl, func(wr 
*WatchedResource) *WatchedResource {
@@ -405,15 +414,15 @@ func ShouldRespond(w Watcher, id string, request 
*discovery.DiscoveryRequest) (b
        // We should always respond "alwaysRespond" marked requests to let xDS 
clients finish warming
        // even though Nonce match and it looks like an ACK.
        if alwaysRespond {
-               log.Infof("%s: FORCE RESPONSE %s for warming.", stype, id)
+               Log.Infof("%s: FORCE RESPONSE %s for warming.", stype, id)
                return true, emptyResourceDelta
        }
 
        if len(removed) == 0 && len(added) == 0 {
-               log.Debugf("%s: ACK %s %s %s", stype, id, request.VersionInfo, 
request.ResponseNonce)
+               Log.Debugf("%s: ACK %s %s %s", stype, id, request.VersionInfo, 
request.ResponseNonce)
                return false, emptyResourceDelta
        }
-       log.Debugf("%s: RESOURCE CHANGE added %v removed %v %s %s %s", stype,
+       Log.Debugf("%s: RESOURCE CHANGE added %v removed %v %s %s %s", stype,
                added, removed, id, request.VersionInfo, request.ResponseNonce)
 
        // For non wildcard resource, if no new resources are subscribed, it 
means we do not need to push.


Reply via email to