This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new e7770c9ef feat(probe): add Kubernetes probe support with liveness,
readiness, and startup checks (#3213)
e7770c9ef is described below
commit e7770c9effa4ca02cb9a69c45ba1124d3325a5c6
Author: Xuetao Li <[email protected]>
AuthorDate: Fri Mar 13 11:50:44 2026 +0800
feat(probe): add Kubernetes probe support with liveness, readiness, and
startup checks (#3213)
* feat(probe): add Kubernetes probe support with liveness, readiness, and
startup checks
---
common/constant/key.go | 7 +++
common/constant/metric.go | 11 ++++
compat.go | 30 +++++++++
config/metric_config.go | 10 +++
global/metric_config.go | 43 ++++++++++++-
metrics/options.go | 38 ++++++++++++
metrics/probe/http.go | 73 ++++++++++++++++++++++
metrics/probe/http_test.go | 84 ++++++++++++++++++++++++++
metrics/probe/probe.go | 98 ++++++++++++++++++++++++++++++
metrics/probe/probe_test.go | 103 +++++++++++++++++++++++++++++++
metrics/probe/server.go | 144 ++++++++++++++++++++++++++++++++++++++++++++
metrics/probe/state.go | 58 ++++++++++++++++++
metrics/probe/state_test.go | 50 +++++++++++++++
server/options.go | 6 ++
server/server.go | 6 ++
15 files changed, 760 insertions(+), 1 deletion(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index d2dd1ebdf..1880faf67 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -476,6 +476,13 @@ const (
PrometheusPushgatewayPasswordKey = "prometheus.pushgateway.password"
PrometheusPushgatewayPushIntervalKey =
"prometheus.pushgateway.push.interval"
PrometheusPushgatewayJobKey = "prometheus.pushgateway.job"
+
+ ProbeEnabledKey = "probe.enabled"
+ ProbePortKey = "probe.port"
+ ProbeLivenessPathKey = "probe.liveness.path"
+ ProbeReadinessPathKey = "probe.readiness.path"
+ ProbeStartupPathKey = "probe.startup.path"
+ ProbeUseInternalStateKey = "probe.use-internal-state"
)
// default meta cache config
diff --git a/common/constant/metric.go b/common/constant/metric.go
index 4b35721eb..212b55517 100644
--- a/common/constant/metric.go
+++ b/common/constant/metric.go
@@ -17,6 +17,10 @@
package constant
+import (
+ "time"
+)
+
// metrics type
const (
MetricsRegistry = "dubbo.metrics.registry"
@@ -53,6 +57,13 @@ const (
PrometheusDefaultMetricsPort = "9090"
PrometheusDefaultPushInterval = 30
PrometheusDefaultJobName = "default_dubbo_job"
+ ProbeDefaultPort = "22222"
+ ProbeDefaultLivenessPath = "/live"
+ ProbeDefaultReadinessPath = "/ready"
+ ProbeDefaultStartupPath = "/startup"
+ ProbeReadHeaderTimeout = 5 * time.Second
+ ProbeWriteTimeout = 10 * time.Second
+ ProbeIdleTimeout = 30 * time.Second
MetricFilterStartTime = "metric_filter_start_time"
)
diff --git a/compat.go b/compat.go
index 7844ba5b1..5af9d3913 100644
--- a/compat.go
+++ b/compat.go
@@ -379,6 +379,21 @@ func compatMetricConfig(c *global.MetricsConfig)
*config.MetricsConfig {
EnableMetadata: c.EnableMetadata,
EnableRegistry: c.EnableRegistry,
EnableConfigCenter: c.EnableConfigCenter,
+ Probe: compatMetricProbeConfig(c.Probe),
+ }
+}
+
+func compatMetricProbeConfig(c *global.ProbeConfig) *config.ProbeConfig {
+ if c == nil {
+ return nil
+ }
+ return &config.ProbeConfig{
+ Enabled: c.Enabled,
+ Port: c.Port,
+ LivenessPath: c.LivenessPath,
+ ReadinessPath: c.ReadinessPath,
+ StartupPath: c.StartupPath,
+ UseInternalState: c.UseInternalState,
}
}
@@ -916,6 +931,21 @@ func compatGlobalMetricConfig(c *config.MetricsConfig)
*global.MetricsConfig {
EnableMetadata: c.EnableMetadata,
EnableRegistry: c.EnableRegistry,
EnableConfigCenter: c.EnableConfigCenter,
+ Probe: compatGlobalMetricProbeConfig(c.Probe),
+ }
+}
+
+func compatGlobalMetricProbeConfig(c *config.ProbeConfig) *global.ProbeConfig {
+ if c == nil {
+ return nil
+ }
+ return &global.ProbeConfig{
+ Enabled: c.Enabled,
+ Port: c.Port,
+ LivenessPath: c.LivenessPath,
+ ReadinessPath: c.ReadinessPath,
+ StartupPath: c.StartupPath,
+ UseInternalState: c.UseInternalState,
}
}
diff --git a/config/metric_config.go b/config/metric_config.go
index 628b0f6a1..12875d7d2 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -44,6 +44,7 @@ type MetricsConfig struct {
EnableConfigCenter *bool `default:"false"
yaml:"enable-config-center" json:"enable-config-center,omitempty"
property:"enable-config-center"`
Prometheus *PrometheusConfig `yaml:"prometheus"
json:"prometheus" property:"prometheus"`
Aggregation *AggregateConfig `yaml:"aggregation"
json:"aggregation" property:"aggregation"`
+ Probe *ProbeConfig `yaml:"probe" json:"probe"
property:"probe"`
rootConfig *RootConfig
}
@@ -58,6 +59,15 @@ type PrometheusConfig struct {
Pushgateway *PushgatewayConfig `yaml:"pushgateway"
json:"pushgateway,omitempty" property:"pushgateway"`
}
+type ProbeConfig struct {
+ Enabled *bool `default:"false" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
+ Port string `default:"22222" yaml:"port"
json:"port,omitempty" property:"port"`
+ LivenessPath string `default:"/live" yaml:"liveness-path"
json:"liveness-path,omitempty" property:"liveness-path"`
+ ReadinessPath string `default:"/ready" yaml:"readiness-path"
json:"readiness-path,omitempty" property:"readiness-path"`
+ StartupPath string `default:"/startup" yaml:"startup-path"
json:"startup-path,omitempty" property:"startup-path"`
+ UseInternalState *bool `default:"true" yaml:"use-internal-state"
json:"use-internal-state,omitempty" property:"use-internal-state"`
+}
+
type Exporter struct {
Enabled *bool `default:"true" yaml:"enabled" json:"enabled,omitempty"
property:"enabled"`
}
diff --git a/global/metric_config.go b/global/metric_config.go
index 88f296463..401d00708 100644
--- a/global/metric_config.go
+++ b/global/metric_config.go
@@ -28,6 +28,7 @@ type MetricsConfig struct {
EnableMetadata *bool `default:"true"
yaml:"enable-metadata" json:"enable-metadata,omitempty"
property:"enable-metadata"`
EnableRegistry *bool `default:"true"
yaml:"enable-registry" json:"enable-registry,omitempty"
property:"enable-registry"`
EnableConfigCenter *bool `default:"true"
yaml:"enable-config-center" json:"enable-config-center,omitempty"
property:"enable-config-center"`
+ Probe *ProbeConfig `yaml:"probe" json:"probe"
property:"probe"`
}
type AggregateConfig struct {
@@ -41,6 +42,15 @@ type PrometheusConfig struct {
Pushgateway *PushgatewayConfig `yaml:"pushgateway"
json:"pushgateway,omitempty" property:"pushgateway"`
}
+type ProbeConfig struct {
+ Enabled *bool `default:"false" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
+ Port string `default:"22222" yaml:"port"
json:"port,omitempty" property:"port"`
+ LivenessPath string `default:"/live" yaml:"liveness-path"
json:"liveness-path,omitempty" property:"liveness-path"`
+ ReadinessPath string `default:"/ready" yaml:"readiness-path"
json:"readiness-path,omitempty" property:"readiness-path"`
+ StartupPath string `default:"/startup" yaml:"startup-path"
json:"startup-path,omitempty" property:"startup-path"`
+ UseInternalState *bool `default:"true" yaml:"use-internal-state"
json:"use-internal-state,omitempty" property:"use-internal-state"`
+}
+
type Exporter struct {
Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty"
property:"enabled"`
}
@@ -57,7 +67,7 @@ type PushgatewayConfig struct {
func DefaultMetricsConfig() *MetricsConfig {
// return a new config without setting any field means there is not any
default value for initialization
- return &MetricsConfig{Prometheus: defaultPrometheusConfig(),
Aggregation: defaultAggregateConfig()}
+ return &MetricsConfig{Prometheus: defaultPrometheusConfig(),
Aggregation: defaultAggregateConfig(), Probe: defaultProbeConfig()}
}
// Clone a new MetricsConfig
@@ -100,6 +110,7 @@ func (c *MetricsConfig) Clone() *MetricsConfig {
EnableMetadata: newEnableMetadata,
EnableRegistry: newEnableRegistry,
EnableConfigCenter: newEnableConfigCenter,
+ Probe: c.Probe.Clone(),
}
}
@@ -155,6 +166,36 @@ func (c *PushgatewayConfig) Clone() *PushgatewayConfig {
}
}
+func defaultProbeConfig() *ProbeConfig {
+ return &ProbeConfig{}
+}
+
+func (c *ProbeConfig) Clone() *ProbeConfig {
+ if c == nil {
+ return nil
+ }
+
+ var newEnabled *bool
+ if c.Enabled != nil {
+ newEnabled = new(bool)
+ *newEnabled = *c.Enabled
+ }
+ var newUseInternalState *bool
+ if c.UseInternalState != nil {
+ newUseInternalState = new(bool)
+ *newUseInternalState = *c.UseInternalState
+ }
+
+ return &ProbeConfig{
+ Enabled: newEnabled,
+ Port: c.Port,
+ LivenessPath: c.LivenessPath,
+ ReadinessPath: c.ReadinessPath,
+ StartupPath: c.StartupPath,
+ UseInternalState: newUseInternalState,
+ }
+}
+
func defaultAggregateConfig() *AggregateConfig {
return &AggregateConfig{}
}
diff --git a/metrics/options.go b/metrics/options.go
index 58542cc57..d494e6a33 100644
--- a/metrics/options.go
+++ b/metrics/options.go
@@ -152,3 +152,41 @@ func WithPath(path string) Option {
opts.Metrics.Path = path
}
}
+
+// Below are options for probe
+func WithProbeEnabled() Option {
+ return func(opts *Options) {
+ b := true
+ opts.Metrics.Probe.Enabled = &b
+ }
+}
+
+func WithProbePort(port int) Option {
+ return func(opts *Options) {
+ opts.Metrics.Probe.Port = strconv.Itoa(port)
+ }
+}
+
+func WithProbeLivenessPath(path string) Option {
+ return func(opts *Options) {
+ opts.Metrics.Probe.LivenessPath = path
+ }
+}
+
+func WithProbeReadinessPath(path string) Option {
+ return func(opts *Options) {
+ opts.Metrics.Probe.ReadinessPath = path
+ }
+}
+
+func WithProbeStartupPath(path string) Option {
+ return func(opts *Options) {
+ opts.Metrics.Probe.StartupPath = path
+ }
+}
+
+func WithProbeUseInternalState(use bool) Option {
+ return func(opts *Options) {
+ opts.Metrics.Probe.UseInternalState = &use
+ }
+}
diff --git a/metrics/probe/http.go b/metrics/probe/http.go
new file mode 100644
index 000000000..ce18b0245
--- /dev/null
+++ b/metrics/probe/http.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "errors"
+ "net/http"
+)
+
+var (
+ errNotReady = errors.New("not ready")
+ errNotStarted = errors.New("not started")
+)
+
+func livenessHandler(w http.ResponseWriter, r *http.Request) {
+ // k8s only use GET
+ if r.Method != http.MethodGet {
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ if err := CheckLiveness(r.Context()); err != nil {
+ http.Error(w, err.Error(), http.StatusServiceUnavailable)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte("ok"))
+}
+
+func readinessHandler(w http.ResponseWriter, r *http.Request) {
+ // k8s only use GET
+ if r.Method != http.MethodGet {
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ if err := CheckReadiness(r.Context()); err != nil {
+ http.Error(w, err.Error(), http.StatusServiceUnavailable)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte("ok"))
+}
+
+func startupHandler(w http.ResponseWriter, r *http.Request) {
+ // k8s only use GET
+ if r.Method != http.MethodGet {
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ if err := CheckStartup(r.Context()); err != nil {
+ http.Error(w, err.Error(), http.StatusServiceUnavailable)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte("ok"))
+}
diff --git a/metrics/probe/http_test.go b/metrics/probe/http_test.go
new file mode 100644
index 000000000..158386031
--- /dev/null
+++ b/metrics/probe/http_test.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+)
+
+func TestHandlersSuccess(t *testing.T) {
+ resetProbeState()
+
+ RegisterLiveness("ok", func(context.Context) error { return nil })
+ RegisterReadiness("ok", func(context.Context) error { return nil })
+ RegisterStartup("ok", func(context.Context) error { return nil })
+
+ req := httptest.NewRequest(http.MethodGet, "/live", nil)
+ rec := httptest.NewRecorder()
+ livenessHandler(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("expected 200 for liveness, got %d", rec.Code)
+ }
+
+ req = httptest.NewRequest(http.MethodGet, "/ready", nil)
+ rec = httptest.NewRecorder()
+ readinessHandler(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("expected 200 for readiness, got %d", rec.Code)
+ }
+
+ req = httptest.NewRequest(http.MethodGet, "/startup", nil)
+ rec = httptest.NewRecorder()
+ startupHandler(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("expected 200 for startup, got %d", rec.Code)
+ }
+}
+
+func TestHandlersFailure(t *testing.T) {
+ resetProbeState()
+
+ RegisterLiveness("fail", func(context.Context) error { return
errors.New("bad") })
+ RegisterReadiness("fail", func(context.Context) error { return
errors.New("bad") })
+ RegisterStartup("fail", func(context.Context) error { return
errors.New("bad") })
+
+ req := httptest.NewRequest(http.MethodGet, "/live", nil)
+ rec := httptest.NewRecorder()
+ livenessHandler(rec, req)
+ if rec.Code != http.StatusServiceUnavailable {
+ t.Fatalf("expected 503 for liveness, got %d", rec.Code)
+ }
+
+ req = httptest.NewRequest(http.MethodGet, "/ready", nil)
+ rec = httptest.NewRecorder()
+ readinessHandler(rec, req)
+ if rec.Code != http.StatusServiceUnavailable {
+ t.Fatalf("expected 503 for readiness, got %d", rec.Code)
+ }
+
+ req = httptest.NewRequest(http.MethodGet, "/startup", nil)
+ rec = httptest.NewRecorder()
+ startupHandler(rec, req)
+ if rec.Code != http.StatusServiceUnavailable {
+ t.Fatalf("expected 503 for startup, got %d", rec.Code)
+ }
+}
diff --git a/metrics/probe/probe.go b/metrics/probe/probe.go
new file mode 100644
index 000000000..0552ce957
--- /dev/null
+++ b/metrics/probe/probe.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "context"
+ "fmt"
+ "sync"
+)
+
+// CheckFunc returns nil when healthy.
+type CheckFunc func(context.Context) error
+
+var (
+ livenessMu sync.RWMutex
+ readinessMu sync.RWMutex
+ startupMu sync.RWMutex
+
+ livenessChecks = make(map[string]CheckFunc)
+ readinessChecks = make(map[string]CheckFunc)
+ startupChecks = make(map[string]CheckFunc)
+)
+
+// RegisterLiveness registers a liveness check.
+func RegisterLiveness(name string, fn CheckFunc) {
+ if name == "" || fn == nil {
+ return
+ }
+ livenessMu.Lock()
+ defer livenessMu.Unlock()
+ livenessChecks[name] = fn
+}
+
+// RegisterReadiness registers a readiness check.
+func RegisterReadiness(name string, fn CheckFunc) {
+ if name == "" || fn == nil {
+ return
+ }
+ readinessMu.Lock()
+ defer readinessMu.Unlock()
+ readinessChecks[name] = fn
+}
+
+// RegisterStartup registers a startup check.
+func RegisterStartup(name string, fn CheckFunc) {
+ if name == "" || fn == nil {
+ return
+ }
+ startupMu.Lock()
+ defer startupMu.Unlock()
+ startupChecks[name] = fn
+}
+
+func runChecks(ctx context.Context, mu *sync.RWMutex, checks
map[string]CheckFunc) error {
+ mu.RLock()
+ snapshot := make(map[string]CheckFunc, len(checks))
+ for k, v := range checks {
+ snapshot[k] = v
+ }
+ mu.RUnlock()
+
+ for name, fn := range snapshot {
+ if err := fn(ctx); err != nil {
+ return fmt.Errorf("probe %s: %w", name, err)
+ }
+ }
+ return nil
+}
+
+// CheckLiveness evaluates all liveness checks.
+func CheckLiveness(ctx context.Context) error {
+ return runChecks(ctx, &livenessMu, livenessChecks)
+}
+
+// CheckReadiness evaluates all readiness checks.
+func CheckReadiness(ctx context.Context) error {
+ return runChecks(ctx, &readinessMu, readinessChecks)
+}
+
+// CheckStartup evaluates all startup checks.
+func CheckStartup(ctx context.Context) error {
+ return runChecks(ctx, &startupMu, startupChecks)
+}
diff --git a/metrics/probe/probe_test.go b/metrics/probe/probe_test.go
new file mode 100644
index 000000000..6c2a499d5
--- /dev/null
+++ b/metrics/probe/probe_test.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "context"
+ "errors"
+ "testing"
+)
+
+func resetProbeState() {
+ livenessMu.Lock()
+ livenessChecks = map[string]CheckFunc{}
+ livenessMu.Unlock()
+
+ readinessMu.Lock()
+ readinessChecks = map[string]CheckFunc{}
+ readinessMu.Unlock()
+
+ startupMu.Lock()
+ startupChecks = map[string]CheckFunc{}
+ startupMu.Unlock()
+
+ internalStateEnabled.Store(false)
+ readyFlag.Store(false)
+ startupFlag.Store(false)
+}
+
+func TestRegisterAndCheckLiveness(t *testing.T) {
+ resetProbeState()
+
+ RegisterLiveness("", func(context.Context) error { return nil })
+ RegisterLiveness("nil", nil)
+
+ livenessMu.RLock()
+ if got := len(livenessChecks); got != 0 {
+ livenessMu.RUnlock()
+ t.Fatalf("expected 0 liveness checks, got %d", got)
+ }
+ livenessMu.RUnlock()
+
+ RegisterLiveness("ok", func(context.Context) error { return nil })
+ if err := CheckLiveness(context.Background()); err != nil {
+ t.Fatalf("expected liveness ok, got %v", err)
+ }
+
+ RegisterLiveness("fail", func(context.Context) error { return
errors.New("boom") })
+ if err := CheckLiveness(context.Background()); err == nil {
+ t.Fatalf("expected liveness error, got nil")
+ }
+}
+
+func TestCheckReadinessAndStartupDefault(t *testing.T) {
+ resetProbeState()
+
+ if err := CheckReadiness(context.Background()); err != nil {
+ t.Fatalf("expected readiness ok with no checks, got %v", err)
+ }
+ if err := CheckStartup(context.Background()); err != nil {
+ t.Fatalf("expected startup ok with no checks, got %v", err)
+ }
+}
+
+func TestInternalStateFlags(t *testing.T) {
+ resetProbeState()
+
+ EnableInternalState(true)
+ if internalReady() {
+ t.Fatalf("expected internalReady false by default")
+ }
+ if internalStartup() {
+ t.Fatalf("expected internalStartup false by default")
+ }
+
+ SetReady(true)
+ SetStartupComplete(true)
+ if !internalReady() {
+ t.Fatalf("expected internalReady true after SetReady(true)")
+ }
+ if !internalStartup() {
+ t.Fatalf("expected internalStartup true after
SetStartupComplete(true)")
+ }
+
+ EnableInternalState(false)
+ if !internalReady() || !internalStartup() {
+ t.Fatalf("expected internal state checks bypassed when
disabled")
+ }
+}
diff --git a/metrics/probe/server.go b/metrics/probe/server.go
new file mode 100644
index 000000000..f327bb8da
--- /dev/null
+++ b/metrics/probe/server.go
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "context"
+ "net/http"
+ "strconv"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+type Config struct {
+ Enabled bool
+ Port string
+ LivenessPath string
+ ReadinessPath string
+ StartupPath string
+ UseInternalState bool
+}
+
+var (
+ startOnce sync.Once
+)
+
+func Init(cfg *Config) {
+ if cfg == nil || !cfg.Enabled {
+ return
+ }
+ startOnce.Do(func() {
+ if cfg.UseInternalState {
+ EnableInternalState(true)
+ SetReady(false)
+ SetStartupComplete(false)
+ RegisterReadiness("internal", func(ctx context.Context)
error {
+ if internalReady() {
+ return nil
+ }
+ return errNotReady
+ })
+ RegisterStartup("internal", func(ctx context.Context)
error {
+ if internalStartup() {
+ return nil
+ }
+ return errNotStarted
+ })
+ }
+
+ mux := http.NewServeMux()
+ if cfg.LivenessPath != "" {
+ mux.HandleFunc(cfg.LivenessPath, livenessHandler)
+ }
+ if cfg.ReadinessPath != "" {
+ mux.HandleFunc(cfg.ReadinessPath, readinessHandler)
+ }
+ if cfg.StartupPath != "" {
+ mux.HandleFunc(cfg.StartupPath, startupHandler)
+ }
+ srv := &http.Server{
+ Addr: ":" + cfg.Port,
+ Handler: mux,
+ ReadHeaderTimeout: constant.ProbeReadHeaderTimeout,
+ WriteTimeout: constant.ProbeWriteTimeout,
+ IdleTimeout: constant.ProbeIdleTimeout,
+ }
+ extension.AddCustomShutdownCallback(func() {
+ SetReady(false)
+ ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := srv.Shutdown(ctx); err != nil {
+ logger.Errorf("[kubernetes probe] probe server
shutdown failed: %v", err)
+ }
+ })
+
+ go func() {
+ logger.Infof("[kubernetes probe] probe server listening
on :%s", cfg.Port)
+ if err := srv.ListenAndServe(); err != nil && err !=
http.ErrServerClosed {
+ logger.Errorf("[kubernetes probe] probe server
stopped with error: %v", err)
+ }
+ }()
+ })
+}
+
+func BuildProbeConfig(probeCfg *global.ProbeConfig) *Config {
+ if probeCfg == nil || probeCfg.Enabled == nil || !*probeCfg.Enabled {
+ return nil
+ }
+
+ useInternal := probeCfg.UseInternalState == nil ||
*probeCfg.UseInternalState
+ port := probeCfg.Port
+
+ if port == "" {
+ port = constant.ProbeDefaultPort
+ } else if p, err := strconv.Atoi(port); p < 1 || p > 65535 || err !=
nil {
+ logger.Error("[kubernetes probe] unsupported probe server port,
set to default ", constant.ProbeDefaultPort)
+ }
+
+ livenessPath := probeCfg.LivenessPath
+ if livenessPath == "" {
+ livenessPath = constant.ProbeDefaultLivenessPath
+ }
+ readinessPath := probeCfg.ReadinessPath
+ if readinessPath == "" {
+ readinessPath = constant.ProbeDefaultReadinessPath
+ }
+ startupPath := probeCfg.StartupPath
+ if startupPath == "" {
+ startupPath = constant.ProbeDefaultStartupPath
+ }
+
+ return &Config{
+ Enabled: true,
+ Port: port,
+ LivenessPath: livenessPath,
+ ReadinessPath: readinessPath,
+ StartupPath: startupPath,
+ UseInternalState: useInternal,
+ }
+}
diff --git a/metrics/probe/state.go b/metrics/probe/state.go
new file mode 100644
index 000000000..31a098028
--- /dev/null
+++ b/metrics/probe/state.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "sync/atomic"
+)
+
+var (
+ internalStateEnabled atomic.Bool
+ readyFlag atomic.Bool
+ startupFlag atomic.Bool
+)
+
+// EnableInternalState controls whether readiness/startup checks
+// should validate internal state flags.
+func EnableInternalState(enabled bool) {
+ internalStateEnabled.Store(enabled)
+}
+
+// SetReady sets readiness state used by the internal readiness check.
+func SetReady(ready bool) {
+ readyFlag.Store(ready)
+}
+
+// SetStartupComplete sets startup state used by the internal startup check.
+func SetStartupComplete(ready bool) {
+ startupFlag.Store(ready)
+}
+
+func internalReady() bool {
+ if !internalStateEnabled.Load() {
+ return true
+ }
+ return readyFlag.Load()
+}
+
+func internalStartup() bool {
+ if !internalStateEnabled.Load() {
+ return true
+ }
+ return startupFlag.Load()
+}
diff --git a/metrics/probe/state_test.go b/metrics/probe/state_test.go
new file mode 100644
index 000000000..0c1acdc50
--- /dev/null
+++ b/metrics/probe/state_test.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package probe
+
+import (
+ "testing"
+)
+
+func TestStateDefaultsAndToggle(t *testing.T) {
+ resetProbeState()
+
+ if internalReady() != true {
+ t.Fatalf("expected internalReady true when internal state
disabled")
+ }
+ if internalStartup() != true {
+ t.Fatalf("expected internalStartup true when internal state
disabled")
+ }
+
+ EnableInternalState(true)
+ if internalReady() {
+ t.Fatalf("expected internalReady false when enabled and not
ready")
+ }
+ if internalStartup() {
+ t.Fatalf("expected internalStartup false when enabled and not
started")
+ }
+
+ SetReady(true)
+ SetStartupComplete(true)
+ if !internalReady() {
+ t.Fatalf("expected internalReady true after SetReady(true)")
+ }
+ if !internalStartup() {
+ t.Fatalf("expected internalStartup true after
SetStartupComplete(true)")
+ }
+}
diff --git a/server/options.go b/server/options.go
index 53aea1880..fb064a46d 100644
--- a/server/options.go
+++ b/server/options.go
@@ -42,6 +42,7 @@ import (
aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
+ "dubbo.apache.org/dubbo-go/v3/metrics/probe"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/registry"
@@ -107,6 +108,11 @@ func (srvOpts *ServerOptions) init(opts ...ServerOption)
error {
// init graceful_shutdown
graceful_shutdown.Init(graceful_shutdown.SetShutdownConfig(srvOpts.Shutdown))
+ // init probe
+ if probeCfg := probe.BuildProbeConfig(srvOpts.Metrics.Probe); probeCfg
!= nil {
+ probe.Init(probeCfg)
+ }
+
return nil
}
diff --git a/server/server.go b/server/server.go
index 9fde10bcd..c4bba3950 100644
--- a/server/server.go
+++ b/server/server.go
@@ -38,6 +38,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/dubboutil"
"dubbo.apache.org/dubbo-go/v3/metadata"
+ "dubbo.apache.org/dubbo-go/v3/metrics/probe"
"dubbo.apache.org/dubbo-go/v3/registry/exposed_tmp"
)
@@ -339,6 +340,11 @@ func (s *Server) Serve() error {
if err := exposed_tmp.RegisterServiceInstance(); err != nil {
return err
}
+
+ // k8s probe ready
+ probe.SetStartupComplete(true)
+ probe.SetReady(true)
+
select {}
}