This is an automated email from the ASF dual-hosted git repository.

xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new e7770c9ef feat(probe): add Kubernetes probe support with liveness, 
readiness, and startup checks (#3213)
e7770c9ef is described below

commit e7770c9effa4ca02cb9a69c45ba1124d3325a5c6
Author: Xuetao Li <[email protected]>
AuthorDate: Fri Mar 13 11:50:44 2026 +0800

    feat(probe): add Kubernetes probe support with liveness, readiness, and 
startup checks (#3213)
    
    * feat(probe): add Kubernetes probe support with liveness, readiness, and 
startup checks
---
 common/constant/key.go      |   7 +++
 common/constant/metric.go   |  11 ++++
 compat.go                   |  30 +++++++++
 config/metric_config.go     |  10 +++
 global/metric_config.go     |  43 ++++++++++++-
 metrics/options.go          |  38 ++++++++++++
 metrics/probe/http.go       |  73 ++++++++++++++++++++++
 metrics/probe/http_test.go  |  84 ++++++++++++++++++++++++++
 metrics/probe/probe.go      |  98 ++++++++++++++++++++++++++++++
 metrics/probe/probe_test.go | 103 +++++++++++++++++++++++++++++++
 metrics/probe/server.go     | 144 ++++++++++++++++++++++++++++++++++++++++++++
 metrics/probe/state.go      |  58 ++++++++++++++++++
 metrics/probe/state_test.go |  50 +++++++++++++++
 server/options.go           |   6 ++
 server/server.go            |   6 ++
 15 files changed, 760 insertions(+), 1 deletion(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index d2dd1ebdf..1880faf67 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -476,6 +476,13 @@ const (
        PrometheusPushgatewayPasswordKey     = "prometheus.pushgateway.password"
        PrometheusPushgatewayPushIntervalKey = 
"prometheus.pushgateway.push.interval"
        PrometheusPushgatewayJobKey          = "prometheus.pushgateway.job"
+
+       ProbeEnabledKey          = "probe.enabled"
+       ProbePortKey             = "probe.port"
+       ProbeLivenessPathKey     = "probe.liveness.path"
+       ProbeReadinessPathKey    = "probe.readiness.path"
+       ProbeStartupPathKey      = "probe.startup.path"
+       ProbeUseInternalStateKey = "probe.use-internal-state"
 )
 
 // default meta cache config
diff --git a/common/constant/metric.go b/common/constant/metric.go
index 4b35721eb..212b55517 100644
--- a/common/constant/metric.go
+++ b/common/constant/metric.go
@@ -17,6 +17,10 @@
 
 package constant
 
+import (
+       "time"
+)
+
 // metrics type
 const (
        MetricsRegistry     = "dubbo.metrics.registry"
@@ -53,6 +57,13 @@ const (
        PrometheusDefaultMetricsPort        = "9090"
        PrometheusDefaultPushInterval       = 30
        PrometheusDefaultJobName            = "default_dubbo_job"
+       ProbeDefaultPort                    = "22222"
+       ProbeDefaultLivenessPath            = "/live"
+       ProbeDefaultReadinessPath           = "/ready"
+       ProbeDefaultStartupPath             = "/startup"
+       ProbeReadHeaderTimeout              = 5 * time.Second
+       ProbeWriteTimeout                   = 10 * time.Second
+       ProbeIdleTimeout                    = 30 * time.Second
        MetricFilterStartTime               = "metric_filter_start_time"
 )
 
diff --git a/compat.go b/compat.go
index 7844ba5b1..5af9d3913 100644
--- a/compat.go
+++ b/compat.go
@@ -379,6 +379,21 @@ func compatMetricConfig(c *global.MetricsConfig) 
*config.MetricsConfig {
                EnableMetadata:     c.EnableMetadata,
                EnableRegistry:     c.EnableRegistry,
                EnableConfigCenter: c.EnableConfigCenter,
+               Probe:              compatMetricProbeConfig(c.Probe),
+       }
+}
+
+func compatMetricProbeConfig(c *global.ProbeConfig) *config.ProbeConfig {
+       if c == nil {
+               return nil
+       }
+       return &config.ProbeConfig{
+               Enabled:          c.Enabled,
+               Port:             c.Port,
+               LivenessPath:     c.LivenessPath,
+               ReadinessPath:    c.ReadinessPath,
+               StartupPath:      c.StartupPath,
+               UseInternalState: c.UseInternalState,
        }
 }
 
@@ -916,6 +931,21 @@ func compatGlobalMetricConfig(c *config.MetricsConfig) 
*global.MetricsConfig {
                EnableMetadata:     c.EnableMetadata,
                EnableRegistry:     c.EnableRegistry,
                EnableConfigCenter: c.EnableConfigCenter,
+               Probe:              compatGlobalMetricProbeConfig(c.Probe),
+       }
+}
+
+func compatGlobalMetricProbeConfig(c *config.ProbeConfig) *global.ProbeConfig {
+       if c == nil {
+               return nil
+       }
+       return &global.ProbeConfig{
+               Enabled:          c.Enabled,
+               Port:             c.Port,
+               LivenessPath:     c.LivenessPath,
+               ReadinessPath:    c.ReadinessPath,
+               StartupPath:      c.StartupPath,
+               UseInternalState: c.UseInternalState,
        }
 }
 
diff --git a/config/metric_config.go b/config/metric_config.go
index 628b0f6a1..12875d7d2 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -44,6 +44,7 @@ type MetricsConfig struct {
        EnableConfigCenter *bool             `default:"false" 
yaml:"enable-config-center" json:"enable-config-center,omitempty" 
property:"enable-config-center"`
        Prometheus         *PrometheusConfig `yaml:"prometheus" 
json:"prometheus" property:"prometheus"`
        Aggregation        *AggregateConfig  `yaml:"aggregation" 
json:"aggregation" property:"aggregation"`
+       Probe              *ProbeConfig      `yaml:"probe" json:"probe" 
property:"probe"`
        rootConfig         *RootConfig
 }
 
@@ -58,6 +59,15 @@ type PrometheusConfig struct {
        Pushgateway *PushgatewayConfig `yaml:"pushgateway" 
json:"pushgateway,omitempty" property:"pushgateway"`
 }
 
+type ProbeConfig struct {
+       Enabled          *bool  `default:"false" yaml:"enabled" 
json:"enabled,omitempty" property:"enabled"`
+       Port             string `default:"22222" yaml:"port" 
json:"port,omitempty" property:"port"`
+       LivenessPath     string `default:"/live" yaml:"liveness-path" 
json:"liveness-path,omitempty" property:"liveness-path"`
+       ReadinessPath    string `default:"/ready" yaml:"readiness-path" 
json:"readiness-path,omitempty" property:"readiness-path"`
+       StartupPath      string `default:"/startup" yaml:"startup-path" 
json:"startup-path,omitempty" property:"startup-path"`
+       UseInternalState *bool  `default:"true" yaml:"use-internal-state" 
json:"use-internal-state,omitempty" property:"use-internal-state"`
+}
+
 type Exporter struct {
        Enabled *bool `default:"true" yaml:"enabled" json:"enabled,omitempty" 
property:"enabled"`
 }
diff --git a/global/metric_config.go b/global/metric_config.go
index 88f296463..401d00708 100644
--- a/global/metric_config.go
+++ b/global/metric_config.go
@@ -28,6 +28,7 @@ type MetricsConfig struct {
        EnableMetadata     *bool             `default:"true" 
yaml:"enable-metadata" json:"enable-metadata,omitempty" 
property:"enable-metadata"`
        EnableRegistry     *bool             `default:"true" 
yaml:"enable-registry" json:"enable-registry,omitempty" 
property:"enable-registry"`
        EnableConfigCenter *bool             `default:"true" 
yaml:"enable-config-center" json:"enable-config-center,omitempty" 
property:"enable-config-center"`
+       Probe              *ProbeConfig      `yaml:"probe" json:"probe" 
property:"probe"`
 }
 
 type AggregateConfig struct {
@@ -41,6 +42,15 @@ type PrometheusConfig struct {
        Pushgateway *PushgatewayConfig `yaml:"pushgateway" 
json:"pushgateway,omitempty" property:"pushgateway"`
 }
 
+type ProbeConfig struct {
+       Enabled          *bool  `default:"false" yaml:"enabled" 
json:"enabled,omitempty" property:"enabled"`
+       Port             string `default:"22222" yaml:"port" 
json:"port,omitempty" property:"port"`
+       LivenessPath     string `default:"/live" yaml:"liveness-path" 
json:"liveness-path,omitempty" property:"liveness-path"`
+       ReadinessPath    string `default:"/ready" yaml:"readiness-path" 
json:"readiness-path,omitempty" property:"readiness-path"`
+       StartupPath      string `default:"/startup" yaml:"startup-path" 
json:"startup-path,omitempty" property:"startup-path"`
+       UseInternalState *bool  `default:"true" yaml:"use-internal-state" 
json:"use-internal-state,omitempty" property:"use-internal-state"`
+}
+
 type Exporter struct {
        Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" 
property:"enabled"`
 }
@@ -57,7 +67,7 @@ type PushgatewayConfig struct {
 
 func DefaultMetricsConfig() *MetricsConfig {
        // return a new config without setting any field means there is not any 
default value for initialization
-       return &MetricsConfig{Prometheus: defaultPrometheusConfig(), 
Aggregation: defaultAggregateConfig()}
+       return &MetricsConfig{Prometheus: defaultPrometheusConfig(), 
Aggregation: defaultAggregateConfig(), Probe: defaultProbeConfig()}
 }
 
 // Clone a new MetricsConfig
@@ -100,6 +110,7 @@ func (c *MetricsConfig) Clone() *MetricsConfig {
                EnableMetadata:     newEnableMetadata,
                EnableRegistry:     newEnableRegistry,
                EnableConfigCenter: newEnableConfigCenter,
+               Probe:              c.Probe.Clone(),
        }
 }
 
@@ -155,6 +166,36 @@ func (c *PushgatewayConfig) Clone() *PushgatewayConfig {
        }
 }
 
+func defaultProbeConfig() *ProbeConfig {
+       return &ProbeConfig{}
+}
+
+func (c *ProbeConfig) Clone() *ProbeConfig {
+       if c == nil {
+               return nil
+       }
+
+       var newEnabled *bool
+       if c.Enabled != nil {
+               newEnabled = new(bool)
+               *newEnabled = *c.Enabled
+       }
+       var newUseInternalState *bool
+       if c.UseInternalState != nil {
+               newUseInternalState = new(bool)
+               *newUseInternalState = *c.UseInternalState
+       }
+
+       return &ProbeConfig{
+               Enabled:          newEnabled,
+               Port:             c.Port,
+               LivenessPath:     c.LivenessPath,
+               ReadinessPath:    c.ReadinessPath,
+               StartupPath:      c.StartupPath,
+               UseInternalState: newUseInternalState,
+       }
+}
+
 func defaultAggregateConfig() *AggregateConfig {
        return &AggregateConfig{}
 }
diff --git a/metrics/options.go b/metrics/options.go
index 58542cc57..d494e6a33 100644
--- a/metrics/options.go
+++ b/metrics/options.go
@@ -152,3 +152,41 @@ func WithPath(path string) Option {
                opts.Metrics.Path = path
        }
 }
+
+// Below are options for probe
+func WithProbeEnabled() Option {
+       return func(opts *Options) {
+               b := true
+               opts.Metrics.Probe.Enabled = &b
+       }
+}
+
+func WithProbePort(port int) Option {
+       return func(opts *Options) {
+               opts.Metrics.Probe.Port = strconv.Itoa(port)
+       }
+}
+
+func WithProbeLivenessPath(path string) Option {
+       return func(opts *Options) {
+               opts.Metrics.Probe.LivenessPath = path
+       }
+}
+
+func WithProbeReadinessPath(path string) Option {
+       return func(opts *Options) {
+               opts.Metrics.Probe.ReadinessPath = path
+       }
+}
+
+func WithProbeStartupPath(path string) Option {
+       return func(opts *Options) {
+               opts.Metrics.Probe.StartupPath = path
+       }
+}
+
+func WithProbeUseInternalState(use bool) Option {
+       return func(opts *Options) {
+               opts.Metrics.Probe.UseInternalState = &use
+       }
+}
diff --git a/metrics/probe/http.go b/metrics/probe/http.go
new file mode 100644
index 000000000..ce18b0245
--- /dev/null
+++ b/metrics/probe/http.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "errors"
+       "net/http"
+)
+
+var (
+       errNotReady   = errors.New("not ready")
+       errNotStarted = errors.New("not started")
+)
+
+func livenessHandler(w http.ResponseWriter, r *http.Request) {
+       // k8s only use GET
+       if r.Method != http.MethodGet {
+               http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+
+       if err := CheckLiveness(r.Context()); err != nil {
+               http.Error(w, err.Error(), http.StatusServiceUnavailable)
+               return
+       }
+       w.WriteHeader(http.StatusOK)
+       _, _ = w.Write([]byte("ok"))
+}
+
+func readinessHandler(w http.ResponseWriter, r *http.Request) {
+       // k8s only use GET
+       if r.Method != http.MethodGet {
+               http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+
+       if err := CheckReadiness(r.Context()); err != nil {
+               http.Error(w, err.Error(), http.StatusServiceUnavailable)
+               return
+       }
+       w.WriteHeader(http.StatusOK)
+       _, _ = w.Write([]byte("ok"))
+}
+
+func startupHandler(w http.ResponseWriter, r *http.Request) {
+       // k8s only use GET
+       if r.Method != http.MethodGet {
+               http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+
+       if err := CheckStartup(r.Context()); err != nil {
+               http.Error(w, err.Error(), http.StatusServiceUnavailable)
+               return
+       }
+       w.WriteHeader(http.StatusOK)
+       _, _ = w.Write([]byte("ok"))
+}
diff --git a/metrics/probe/http_test.go b/metrics/probe/http_test.go
new file mode 100644
index 000000000..158386031
--- /dev/null
+++ b/metrics/probe/http_test.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "context"
+       "errors"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+)
+
+func TestHandlersSuccess(t *testing.T) {
+       resetProbeState()
+
+       RegisterLiveness("ok", func(context.Context) error { return nil })
+       RegisterReadiness("ok", func(context.Context) error { return nil })
+       RegisterStartup("ok", func(context.Context) error { return nil })
+
+       req := httptest.NewRequest(http.MethodGet, "/live", nil)
+       rec := httptest.NewRecorder()
+       livenessHandler(rec, req)
+       if rec.Code != http.StatusOK {
+               t.Fatalf("expected 200 for liveness, got %d", rec.Code)
+       }
+
+       req = httptest.NewRequest(http.MethodGet, "/ready", nil)
+       rec = httptest.NewRecorder()
+       readinessHandler(rec, req)
+       if rec.Code != http.StatusOK {
+               t.Fatalf("expected 200 for readiness, got %d", rec.Code)
+       }
+
+       req = httptest.NewRequest(http.MethodGet, "/startup", nil)
+       rec = httptest.NewRecorder()
+       startupHandler(rec, req)
+       if rec.Code != http.StatusOK {
+               t.Fatalf("expected 200 for startup, got %d", rec.Code)
+       }
+}
+
+func TestHandlersFailure(t *testing.T) {
+       resetProbeState()
+
+       RegisterLiveness("fail", func(context.Context) error { return 
errors.New("bad") })
+       RegisterReadiness("fail", func(context.Context) error { return 
errors.New("bad") })
+       RegisterStartup("fail", func(context.Context) error { return 
errors.New("bad") })
+
+       req := httptest.NewRequest(http.MethodGet, "/live", nil)
+       rec := httptest.NewRecorder()
+       livenessHandler(rec, req)
+       if rec.Code != http.StatusServiceUnavailable {
+               t.Fatalf("expected 503 for liveness, got %d", rec.Code)
+       }
+
+       req = httptest.NewRequest(http.MethodGet, "/ready", nil)
+       rec = httptest.NewRecorder()
+       readinessHandler(rec, req)
+       if rec.Code != http.StatusServiceUnavailable {
+               t.Fatalf("expected 503 for readiness, got %d", rec.Code)
+       }
+
+       req = httptest.NewRequest(http.MethodGet, "/startup", nil)
+       rec = httptest.NewRecorder()
+       startupHandler(rec, req)
+       if rec.Code != http.StatusServiceUnavailable {
+               t.Fatalf("expected 503 for startup, got %d", rec.Code)
+       }
+}
diff --git a/metrics/probe/probe.go b/metrics/probe/probe.go
new file mode 100644
index 000000000..0552ce957
--- /dev/null
+++ b/metrics/probe/probe.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "context"
+       "fmt"
+       "sync"
+)
+
+// CheckFunc returns nil when healthy.
+type CheckFunc func(context.Context) error
+
+var (
+       livenessMu  sync.RWMutex
+       readinessMu sync.RWMutex
+       startupMu   sync.RWMutex
+
+       livenessChecks  = make(map[string]CheckFunc)
+       readinessChecks = make(map[string]CheckFunc)
+       startupChecks   = make(map[string]CheckFunc)
+)
+
+// RegisterLiveness registers a liveness check.
+func RegisterLiveness(name string, fn CheckFunc) {
+       if name == "" || fn == nil {
+               return
+       }
+       livenessMu.Lock()
+       defer livenessMu.Unlock()
+       livenessChecks[name] = fn
+}
+
+// RegisterReadiness registers a readiness check.
+func RegisterReadiness(name string, fn CheckFunc) {
+       if name == "" || fn == nil {
+               return
+       }
+       readinessMu.Lock()
+       defer readinessMu.Unlock()
+       readinessChecks[name] = fn
+}
+
+// RegisterStartup registers a startup check.
+func RegisterStartup(name string, fn CheckFunc) {
+       if name == "" || fn == nil {
+               return
+       }
+       startupMu.Lock()
+       defer startupMu.Unlock()
+       startupChecks[name] = fn
+}
+
+func runChecks(ctx context.Context, mu *sync.RWMutex, checks 
map[string]CheckFunc) error {
+       mu.RLock()
+       snapshot := make(map[string]CheckFunc, len(checks))
+       for k, v := range checks {
+               snapshot[k] = v
+       }
+       mu.RUnlock()
+
+       for name, fn := range snapshot {
+               if err := fn(ctx); err != nil {
+                       return fmt.Errorf("probe %s: %w", name, err)
+               }
+       }
+       return nil
+}
+
+// CheckLiveness evaluates all liveness checks.
+func CheckLiveness(ctx context.Context) error {
+       return runChecks(ctx, &livenessMu, livenessChecks)
+}
+
+// CheckReadiness evaluates all readiness checks.
+func CheckReadiness(ctx context.Context) error {
+       return runChecks(ctx, &readinessMu, readinessChecks)
+}
+
+// CheckStartup evaluates all startup checks.
+func CheckStartup(ctx context.Context) error {
+       return runChecks(ctx, &startupMu, startupChecks)
+}
diff --git a/metrics/probe/probe_test.go b/metrics/probe/probe_test.go
new file mode 100644
index 000000000..6c2a499d5
--- /dev/null
+++ b/metrics/probe/probe_test.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "context"
+       "errors"
+       "testing"
+)
+
+func resetProbeState() {
+       livenessMu.Lock()
+       livenessChecks = map[string]CheckFunc{}
+       livenessMu.Unlock()
+
+       readinessMu.Lock()
+       readinessChecks = map[string]CheckFunc{}
+       readinessMu.Unlock()
+
+       startupMu.Lock()
+       startupChecks = map[string]CheckFunc{}
+       startupMu.Unlock()
+
+       internalStateEnabled.Store(false)
+       readyFlag.Store(false)
+       startupFlag.Store(false)
+}
+
+func TestRegisterAndCheckLiveness(t *testing.T) {
+       resetProbeState()
+
+       RegisterLiveness("", func(context.Context) error { return nil })
+       RegisterLiveness("nil", nil)
+
+       livenessMu.RLock()
+       if got := len(livenessChecks); got != 0 {
+               livenessMu.RUnlock()
+               t.Fatalf("expected 0 liveness checks, got %d", got)
+       }
+       livenessMu.RUnlock()
+
+       RegisterLiveness("ok", func(context.Context) error { return nil })
+       if err := CheckLiveness(context.Background()); err != nil {
+               t.Fatalf("expected liveness ok, got %v", err)
+       }
+
+       RegisterLiveness("fail", func(context.Context) error { return 
errors.New("boom") })
+       if err := CheckLiveness(context.Background()); err == nil {
+               t.Fatalf("expected liveness error, got nil")
+       }
+}
+
+func TestCheckReadinessAndStartupDefault(t *testing.T) {
+       resetProbeState()
+
+       if err := CheckReadiness(context.Background()); err != nil {
+               t.Fatalf("expected readiness ok with no checks, got %v", err)
+       }
+       if err := CheckStartup(context.Background()); err != nil {
+               t.Fatalf("expected startup ok with no checks, got %v", err)
+       }
+}
+
+func TestInternalStateFlags(t *testing.T) {
+       resetProbeState()
+
+       EnableInternalState(true)
+       if internalReady() {
+               t.Fatalf("expected internalReady false by default")
+       }
+       if internalStartup() {
+               t.Fatalf("expected internalStartup false by default")
+       }
+
+       SetReady(true)
+       SetStartupComplete(true)
+       if !internalReady() {
+               t.Fatalf("expected internalReady true after SetReady(true)")
+       }
+       if !internalStartup() {
+               t.Fatalf("expected internalStartup true after 
SetStartupComplete(true)")
+       }
+
+       EnableInternalState(false)
+       if !internalReady() || !internalStartup() {
+               t.Fatalf("expected internal state checks bypassed when 
disabled")
+       }
+}
diff --git a/metrics/probe/server.go b/metrics/probe/server.go
new file mode 100644
index 000000000..f327bb8da
--- /dev/null
+++ b/metrics/probe/server.go
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "context"
+       "net/http"
+       "strconv"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+type Config struct {
+       Enabled          bool
+       Port             string
+       LivenessPath     string
+       ReadinessPath    string
+       StartupPath      string
+       UseInternalState bool
+}
+
+var (
+       startOnce sync.Once
+)
+
+func Init(cfg *Config) {
+       if cfg == nil || !cfg.Enabled {
+               return
+       }
+       startOnce.Do(func() {
+               if cfg.UseInternalState {
+                       EnableInternalState(true)
+                       SetReady(false)
+                       SetStartupComplete(false)
+                       RegisterReadiness("internal", func(ctx context.Context) 
error {
+                               if internalReady() {
+                                       return nil
+                               }
+                               return errNotReady
+                       })
+                       RegisterStartup("internal", func(ctx context.Context) 
error {
+                               if internalStartup() {
+                                       return nil
+                               }
+                               return errNotStarted
+                       })
+               }
+
+               mux := http.NewServeMux()
+               if cfg.LivenessPath != "" {
+                       mux.HandleFunc(cfg.LivenessPath, livenessHandler)
+               }
+               if cfg.ReadinessPath != "" {
+                       mux.HandleFunc(cfg.ReadinessPath, readinessHandler)
+               }
+               if cfg.StartupPath != "" {
+                       mux.HandleFunc(cfg.StartupPath, startupHandler)
+               }
+               srv := &http.Server{
+                       Addr:              ":" + cfg.Port,
+                       Handler:           mux,
+                       ReadHeaderTimeout: constant.ProbeReadHeaderTimeout,
+                       WriteTimeout:      constant.ProbeWriteTimeout,
+                       IdleTimeout:       constant.ProbeIdleTimeout,
+               }
+               extension.AddCustomShutdownCallback(func() {
+                       SetReady(false)
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       defer cancel()
+                       if err := srv.Shutdown(ctx); err != nil {
+                               logger.Errorf("[kubernetes probe] probe server 
shutdown failed: %v", err)
+                       }
+               })
+
+               go func() {
+                       logger.Infof("[kubernetes probe] probe server listening 
on :%s", cfg.Port)
+                       if err := srv.ListenAndServe(); err != nil && err != 
http.ErrServerClosed {
+                               logger.Errorf("[kubernetes probe] probe server 
stopped with error: %v", err)
+                       }
+               }()
+       })
+}
+
+func BuildProbeConfig(probeCfg *global.ProbeConfig) *Config {
+       if probeCfg == nil || probeCfg.Enabled == nil || !*probeCfg.Enabled {
+               return nil
+       }
+
+       useInternal := probeCfg.UseInternalState == nil || 
*probeCfg.UseInternalState
+       port := probeCfg.Port
+
+       if port == "" {
+               port = constant.ProbeDefaultPort
+       } else if p, err := strconv.Atoi(port); p < 1 || p > 65535 || err != 
nil {
+               logger.Error("[kubernetes probe] unsupported probe server port, 
set to default ", constant.ProbeDefaultPort)
+       }
+
+       livenessPath := probeCfg.LivenessPath
+       if livenessPath == "" {
+               livenessPath = constant.ProbeDefaultLivenessPath
+       }
+       readinessPath := probeCfg.ReadinessPath
+       if readinessPath == "" {
+               readinessPath = constant.ProbeDefaultReadinessPath
+       }
+       startupPath := probeCfg.StartupPath
+       if startupPath == "" {
+               startupPath = constant.ProbeDefaultStartupPath
+       }
+
+       return &Config{
+               Enabled:          true,
+               Port:             port,
+               LivenessPath:     livenessPath,
+               ReadinessPath:    readinessPath,
+               StartupPath:      startupPath,
+               UseInternalState: useInternal,
+       }
+}
diff --git a/metrics/probe/state.go b/metrics/probe/state.go
new file mode 100644
index 000000000..31a098028
--- /dev/null
+++ b/metrics/probe/state.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "sync/atomic"
+)
+
+var (
+       internalStateEnabled atomic.Bool
+       readyFlag            atomic.Bool
+       startupFlag          atomic.Bool
+)
+
+// EnableInternalState controls whether readiness/startup checks
+// should validate internal state flags.
+func EnableInternalState(enabled bool) {
+       internalStateEnabled.Store(enabled)
+}
+
+// SetReady sets readiness state used by the internal readiness check.
+func SetReady(ready bool) {
+       readyFlag.Store(ready)
+}
+
+// SetStartupComplete sets startup state used by the internal startup check.
+func SetStartupComplete(ready bool) {
+       startupFlag.Store(ready)
+}
+
+func internalReady() bool {
+       if !internalStateEnabled.Load() {
+               return true
+       }
+       return readyFlag.Load()
+}
+
+func internalStartup() bool {
+       if !internalStateEnabled.Load() {
+               return true
+       }
+       return startupFlag.Load()
+}
diff --git a/metrics/probe/state_test.go b/metrics/probe/state_test.go
new file mode 100644
index 000000000..0c1acdc50
--- /dev/null
+++ b/metrics/probe/state_test.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+       "testing"
+)
+
+func TestStateDefaultsAndToggle(t *testing.T) {
+       resetProbeState()
+
+       if internalReady() != true {
+               t.Fatalf("expected internalReady true when internal state 
disabled")
+       }
+       if internalStartup() != true {
+               t.Fatalf("expected internalStartup true when internal state 
disabled")
+       }
+
+       EnableInternalState(true)
+       if internalReady() {
+               t.Fatalf("expected internalReady false when enabled and not 
ready")
+       }
+       if internalStartup() {
+               t.Fatalf("expected internalStartup false when enabled and not 
started")
+       }
+
+       SetReady(true)
+       SetStartupComplete(true)
+       if !internalReady() {
+               t.Fatalf("expected internalReady true after SetReady(true)")
+       }
+       if !internalStartup() {
+               t.Fatalf("expected internalStartup true after 
SetStartupComplete(true)")
+       }
+}
diff --git a/server/options.go b/server/options.go
index 53aea1880..fb064a46d 100644
--- a/server/options.go
+++ b/server/options.go
@@ -42,6 +42,7 @@ import (
        aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
        "dubbo.apache.org/dubbo-go/v3/global"
        "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
+       "dubbo.apache.org/dubbo-go/v3/metrics/probe"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        "dubbo.apache.org/dubbo-go/v3/protocol/base"
        "dubbo.apache.org/dubbo-go/v3/registry"
@@ -107,6 +108,11 @@ func (srvOpts *ServerOptions) init(opts ...ServerOption) 
error {
        // init graceful_shutdown
        
graceful_shutdown.Init(graceful_shutdown.SetShutdownConfig(srvOpts.Shutdown))
 
+       // init probe
+       if probeCfg := probe.BuildProbeConfig(srvOpts.Metrics.Probe); probeCfg 
!= nil {
+               probe.Init(probeCfg)
+       }
+
        return nil
 }
 
diff --git a/server/server.go b/server/server.go
index 9fde10bcd..c4bba3950 100644
--- a/server/server.go
+++ b/server/server.go
@@ -38,6 +38,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/dubboutil"
        "dubbo.apache.org/dubbo-go/v3/metadata"
+       "dubbo.apache.org/dubbo-go/v3/metrics/probe"
        "dubbo.apache.org/dubbo-go/v3/registry/exposed_tmp"
 )
 
@@ -339,6 +340,11 @@ func (s *Server) Serve() error {
        if err := exposed_tmp.RegisterServiceInstance(); err != nil {
                return err
        }
+
+       // k8s probe ready
+       probe.SetStartupComplete(true)
+       probe.SetReady(true)
+
        select {}
 }
 

Reply via email to