This is an automated email from the ASF dual-hosted git repository.
youling1128 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a07d7e6 Add readiness probe api (#1497)
4a07d7e6 is described below
commit 4a07d7e6e7264f811ca0ed4321a141296b37f1cb
Author: humingcheng <[email protected]>
AuthorDate: Sat Jan 18 15:13:07 2025 +0800
Add readiness probe api (#1497)
---
docs/openapi/v4.yaml | 26 ++++++++++++
pkg/protect/checker.go | 42 ++++++++++++++++++++
pkg/protect/checker_test.go | 15 +++++++
pkg/protect/protect.go | 59 +++++++++++++++++++++-------
pkg/protect/protect_test.go | 23 +++++------
server/alarm/common.go | 1 +
server/health/health.go | 23 ++++++++++-
server/interceptor/interceptors_test.go | 2 +-
server/plugin/auth/buildin/buildin_test.go | 3 +-
server/resource/disco/instance_resource.go | 9 ++---
server/rest/controller/v4/main_controller.go | 12 ++++++
server/service/rbac/rbac.go | 1 +
server/service/registry/health.go | 16 ++++++++
server/service/registry/health_test.go | 14 +++++++
server/service/registry/registry.go | 6 +++
15 files changed, 217 insertions(+), 35 deletions(-)
diff --git a/docs/openapi/v4.yaml b/docs/openapi/v4.yaml
index 5fad67c4..d019ed6c 100644
--- a/docs/openapi/v4.yaml
+++ b/docs/openapi/v4.yaml
@@ -83,6 +83,32 @@ paths:
description: 内部错误
schema:
$ref: '#/definitions/Error'
+ /v4/{project}/registry/health/readiness:
+ get:
+ description: |
+ 服务中心readiness探测接口,确认服务中心是否可以接收请求。
+ parameters:
+ - name: x-domain-name
+ in: header
+ type: string
+ default: default
+ - name: project
+ in: path
+ required: true
+ type: string
+ tags:
+ - base
+ responses:
+ 200:
+ description: 状态正常,可以接收请求
+ 400:
+ description: 错误的请求
+ schema:
+ $ref: '#/definitions/Error'
+ 500:
+ description: 内部错误
+ schema:
+ $ref: '#/definitions/Error'
/v4/{project}/registry/microservices/{serviceId}:
get:
description: |
diff --git a/pkg/protect/checker.go b/pkg/protect/checker.go
new file mode 100644
index 00000000..fbbc85ec
--- /dev/null
+++ b/pkg/protect/checker.go
@@ -0,0 +1,42 @@
+package protect
+
+import (
+ "sync"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+// ProtectionChecker checks whether to do the protection on null instance
+type ProtectionChecker interface {
+ CheckProtection() bool
+}
+
+// DelayedStopProtectChecker means start the protection and close it after
some time
+type DelayedStopProtectChecker struct {
+ delay time.Duration
+ start time.Time
+ logWhenStop sync.Once
+}
+
+func NewDelayedSuccessChecker(delay time.Duration) *DelayedStopProtectChecker {
+ return &DelayedStopProtectChecker{
+ delay: delay,
+ start: time.Now(),
+ logWhenStop: sync.Once{},
+ }
+}
+
+func (d *DelayedStopProtectChecker) CheckProtection() bool {
+ if time.Since(d.start) > d.delay {
+ d.logWhenStop.Do(func() {
+ log.Info("null instance protection stopped")
+ })
+ return false
+ }
+ return true
+}
+
+func GetGlobalProtectionChecker() ProtectionChecker {
+ return globalProtectionChecker
+}
diff --git a/pkg/protect/checker_test.go b/pkg/protect/checker_test.go
new file mode 100644
index 00000000..f6c16b9b
--- /dev/null
+++ b/pkg/protect/checker_test.go
@@ -0,0 +1,15 @@
+package protect
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDelayedStopProtectChecker_CheckProtection(t *testing.T) {
+ p := NewDelayedSuccessChecker(1 * time.Second)
+ assert.True(t, p.CheckProtection())
+ time.Sleep(p.delay)
+ assert.False(t, p.CheckProtection())
+}
diff --git a/pkg/protect/protect.go b/pkg/protect/protect.go
index 1649b3e4..a35b3842 100644
--- a/pkg/protect/protect.go
+++ b/pkg/protect/protect.go
@@ -1,13 +1,16 @@
package protect
import (
+ "context"
"fmt"
"net/http"
"time"
- "github.com/apache/servicecomb-service-center/server/config"
+ "github.com/go-chassis/foundation/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/config"
+ "github.com/apache/servicecomb-service-center/server/service/registry"
)
/**
@@ -16,18 +19,18 @@ indicating that sdk not need to clear cache
*/
var (
- isWithinProtection bool
- startupTimestamp int64
enableInstanceNullProtect bool
restartProtectInterval time.Duration
RestartProtectHttpCode int
validProtectCode = map[int]struct{}{http.StatusNotModified:
{}, http.StatusUnprocessableEntity: {}, http.StatusInternalServerError: {}}
+ globalProtectionChecker ProtectionChecker
)
const (
maxInterval = 60 * 60 * 24 * time.Second
minInterval = 0 * time.Second
defaultRestartProtectInterval = 120 * time.Second
+ defaultReadinessCheckInterval = 5 * time.Second // the null instance
protection should start again when server is unready
)
func Init() {
@@ -50,24 +53,52 @@ func Init() {
log.Info(fmt.Sprintf("instance_null_protect.enable: %t",
enableInstanceNullProtect))
log.Info(fmt.Sprintf("instance_null_protect.restart_protect_interval:
%d", restartProtectInterval))
log.Info(fmt.Sprintf("instance_null_protect.http_status: %d",
RestartProtectHttpCode))
- startupTimestamp = time.Now().UnixNano()
- isWithinProtection = true
+
+ StartProtectionAndStopDelayed()
+ gopool.Go(watch)
+
}
-func IsWithinRestartProtection() bool {
+func watch(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(defaultReadinessCheckInterval):
+ err := registry.Readiness(ctx)
+ if err != nil {
+ AlwaysProtection()
+ continue
+ }
+ StartProtectionAndStopDelayed()
+ }
+ }
+}
+
+func ShouldProtectOnNullInstance() bool {
if !enableInstanceNullProtect {
return false
}
- if !isWithinProtection {
- return false
+ if globalProtectionChecker == nil { // protect by default
+ return true
}
- if time.Now().Add(-restartProtectInterval).UnixNano() >
startupTimestamp {
- log.Info("restart protection stop")
- isWithinProtection = false
- return false
+ return GetGlobalProtectionChecker().CheckProtection()
+}
+
+func StartProtectionAndStopDelayed() {
+ if _, ok := globalProtectionChecker.(*DelayedStopProtectChecker); ok {
+ return
+ }
+ globalProtectionChecker =
NewDelayedSuccessChecker(restartProtectInterval)
+ log.Info("start protection and stop delayed on null instance")
+}
+
+func AlwaysProtection() {
+ if globalProtectionChecker == nil {
+ return
}
- log.Info("within restart protection")
- return true
+ globalProtectionChecker = nil
+ log.Info("always protect on null instance")
}
diff --git a/pkg/protect/protect_test.go b/pkg/protect/protect_test.go
index c7c2dd50..9c31cd81 100644
--- a/pkg/protect/protect_test.go
+++ b/pkg/protect/protect_test.go
@@ -12,19 +12,20 @@ func TestIsWithinRestartProtection(t *testing.T) {
// protection switch off
enableInstanceNullProtect = false
- assert.False(t, IsWithinRestartProtection())
- // within protection
+ assert.False(t, ShouldProtectOnNullInstance())
+
enableInstanceNullProtect = true
- isWithinProtection = true
- startupTimestamp = time.Now().Add(-1 * time.Minute).UnixNano()
- assert.True(t, IsWithinRestartProtection())
+ AlwaysProtection()
+ assert.True(t, ShouldProtectOnNullInstance())
// protection delay exceed
- enableInstanceNullProtect = true
- isWithinProtection = true
- startupTimestamp = time.Now().Add(-2 * time.Minute).Unix()
- assert.False(t, IsWithinRestartProtection())
+ restartProtectInterval = 1 * time.Second
+ StartProtectionAndStopDelayed()
+ assert.True(t, ShouldProtectOnNullInstance())
+ time.Sleep(restartProtectInterval)
+ assert.False(t, ShouldProtectOnNullInstance())
- // always false after exceed
- assert.False(t, IsWithinRestartProtection())
+ // only the first takes effects
+ StartProtectionAndStopDelayed()
+ assert.False(t, ShouldProtectOnNullInstance())
}
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 9da4a393..1825db7f 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -31,6 +31,7 @@ const (
const (
IDBackendConnectionRefuse model.ID = "BackendConnectionRefuse"
+ IDScSelfHeartbeatFailed model.ID = "ScSelfHeartbeatFailed"
IDInternalError model.ID = "InternalError"
IDIncrementPullError model.ID = "IncrementPullError"
IDWebsocketOfScSyncerLost model.ID = "WebsocketOfScSyncerLost"
diff --git a/server/health/health.go b/server/health/health.go
index f633027f..cbfb7c1a 100644
--- a/server/health/health.go
+++ b/server/health/health.go
@@ -23,18 +23,29 @@ import (
"github.com/apache/servicecomb-service-center/server/alarm"
)
-var healthChecker Checker = &DefaultHealthChecker{}
+var healthChecker Checker = &NullChecker{}
+var readinessChecker Checker = &DefaultHealthChecker{}
type Checker interface {
Healthy() error
}
+type NullChecker struct {
+}
+
+func (n NullChecker) Healthy() error {
+ return nil
+}
+
type DefaultHealthChecker struct {
}
func (hc *DefaultHealthChecker) Healthy() error {
for _, a := range alarm.ListAll() {
- if a.ID == alarm.IDBackendConnectionRefuse && a.Status !=
alarm.Cleared {
+ if a.Status == alarm.Cleared {
+ continue
+ }
+ if a.ID == alarm.IDBackendConnectionRefuse || a.ID ==
alarm.IDScSelfHeartbeatFailed {
return
errors.New(a.FieldString(alarm.FieldAdditionalContext))
}
}
@@ -48,3 +59,11 @@ func SetGlobalHealthChecker(hc Checker) {
func GlobalHealthChecker() Checker {
return healthChecker
}
+
+func SetGlobalReadinessChecker(hc Checker) {
+ readinessChecker = hc
+}
+
+func GlobalReadinessChecker() Checker {
+ return readinessChecker
+}
diff --git a/server/interceptor/interceptors_test.go
b/server/interceptor/interceptors_test.go
index 4a494206..1d7ba69f 100644
--- a/server/interceptor/interceptors_test.go
+++ b/server/interceptor/interceptors_test.go
@@ -30,7 +30,7 @@ func mockFunc(w http.ResponseWriter, r *http.Request) error {
case 1:
return errors.New("error")
case 0:
- panic(errors.New("panic"))
+ return errors.New("panic")
default:
i++
}
diff --git a/server/plugin/auth/buildin/buildin_test.go
b/server/plugin/auth/buildin/buildin_test.go
index fa2e146f..b16cf661 100644
--- a/server/plugin/auth/buildin/buildin_test.go
+++ b/server/plugin/auth/buildin/buildin_test.go
@@ -27,7 +27,6 @@ import (
"testing"
"time"
- "github.com/form3tech-oss/jwt-go"
"github.com/patrickmn/go-cache"
_ "github.com/apache/servicecomb-service-center/test"
@@ -187,7 +186,7 @@ func TestSetTokenToCache(t *testing.T) {
rawToken1 := "**1"
rawToken2 := "**2"
deleta, _ := time.ParseDuration("10m")
- claims := &jwt.MapClaims{"exp": float64(time.Now().Add(deleta).Unix())}
+ claims := map[string]interface{}{"exp":
float64(time.Now().Add(deleta).Unix())}
t.Run("test Cache", func(t *testing.T) {
buildin.SetTokenToCache(tokenCache1, rawToken1, claims)
buildin.SetTokenToCache(tokenCache1, rawToken2, errors.New("bad
token"))
diff --git a/server/resource/disco/instance_resource.go
b/server/resource/disco/instance_resource.go
index df5fb145..ccb8fc69 100644
--- a/server/resource/disco/instance_resource.go
+++ b/server/resource/disco/instance_resource.go
@@ -19,17 +19,16 @@ package disco
import (
"fmt"
- "github.com/apache/servicecomb-service-center/pkg/protect"
"io"
"net/http"
"strings"
- "github.com/go-chassis/go-chassis/v2/pkg/codec"
-
pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/go-chassis/v2/pkg/codec"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/protect"
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/pkg/util"
discosvc
"github.com/apache/servicecomb-service-center/server/service/disco"
@@ -169,7 +168,7 @@ func (s *InstanceResource) FindInstances(w
http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
- if len(resp.Instances) == 0 && protect.IsWithinRestartProtection() {
+ if len(resp.Instances) == 0 && protect.ShouldProtectOnNullInstance() {
w.WriteHeader(protect.RestartProtectHttpCode)
return
}
@@ -272,7 +271,7 @@ func (s *InstanceResource) ListInstance(w
http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
- if len(resp.Instances) == 0 && protect.IsWithinRestartProtection() {
+ if len(resp.Instances) == 0 && protect.ShouldProtectOnNullInstance() {
w.WriteHeader(protect.RestartProtectHttpCode)
return
}
diff --git a/server/rest/controller/v4/main_controller.go
b/server/rest/controller/v4/main_controller.go
index 5dd745fa..e563cfca 100644
--- a/server/rest/controller/v4/main_controller.go
+++ b/server/rest/controller/v4/main_controller.go
@@ -22,6 +22,8 @@ import (
"net/http"
"sync"
+ "github.com/apache/servicecomb-service-center/server/service/registry"
+
discosvc
"github.com/apache/servicecomb-service-center/server/service/disco"
"github.com/apache/servicecomb-service-center/pkg/rest"
@@ -48,6 +50,7 @@ func (s *MainService) URLPatterns() []rest.Route {
return []rest.Route{
{Method: http.MethodGet, Path: "/v4/:project/registry/version",
Func: s.GetVersion},
{Method: http.MethodGet, Path: "/v4/:project/registry/health",
Func: s.ClusterHealth},
+ {Method: http.MethodGet, Path:
"/v4/:project/registry/health/readiness", Func: s.Readiness},
}
}
@@ -60,6 +63,15 @@ func (s *MainService) ClusterHealth(w http.ResponseWriter, r
*http.Request) {
rest.WriteResponse(w, r, nil, resp)
}
+func (s *MainService) Readiness(w http.ResponseWriter, r *http.Request) {
+ err := registry.Readiness(r.Context())
+ if err != nil {
+ rest.WriteServiceError(w, err)
+ return
+ }
+ rest.WriteResponse(w, r, nil, nil)
+}
+
func (s *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
parseVersionOnce.Do(func() {
result := Result{
diff --git a/server/service/rbac/rbac.go b/server/service/rbac/rbac.go
index 3c3ff5d1..aa666c92 100644
--- a/server/service/rbac/rbac.go
+++ b/server/service/rbac/rbac.go
@@ -74,6 +74,7 @@ func add2WhiteAPIList() {
rbac.Add2WhiteAPIList(APITokenGranter)
rbac.Add2WhiteAPIList("/v4/:project/registry/version", "/version")
rbac.Add2WhiteAPIList("/v4/:project/registry/health", "/health")
+ rbac.Add2WhiteAPIList("/v4/:project/registry/health/readiness")
// user can list self permission without account get permission
Add2CheckPermWhiteAPIList(APISelfPerms)
diff --git a/server/service/registry/health.go
b/server/service/registry/health.go
new file mode 100644
index 00000000..b9ccf836
--- /dev/null
+++ b/server/service/registry/health.go
@@ -0,0 +1,16 @@
+package registry
+
+import (
+ "context"
+
+ pb "github.com/go-chassis/cari/discovery"
+
+ "github.com/apache/servicecomb-service-center/server/health"
+)
+
+func Readiness(_ context.Context) error {
+ if err := health.GlobalReadinessChecker().Healthy(); err != nil {
+ return pb.NewError(pb.ErrUnhealthy, err.Error())
+ }
+ return nil
+}
diff --git a/server/service/registry/health_test.go
b/server/service/registry/health_test.go
new file mode 100644
index 00000000..68896874
--- /dev/null
+++ b/server/service/registry/health_test.go
@@ -0,0 +1,14 @@
+package registry
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/server/health"
+)
+
+func TestReadiness(t *testing.T) {
+ health.SetGlobalReadinessChecker(&health.NullChecker{})
+ assert.NoError(t, Readiness(nil))
+}
diff --git a/server/service/registry/registry.go
b/server/service/registry/registry.go
index 65211d5b..991d8367 100644
--- a/server/service/registry/registry.go
+++ b/server/service/registry/registry.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/alarm"
"github.com/apache/servicecomb-service-center/server/core"
discosvc
"github.com/apache/servicecomb-service-center/server/service/disco"
"github.com/apache/servicecomb-service-center/server/service/sync"
@@ -132,14 +133,19 @@ func autoSelfHeartBeat() {
case
<-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
err := selfHeartBeat(ctx)
if err == nil {
+
alarm.Clear(alarm.IDScSelfHeartbeatFailed)
continue
}
+ // 为什么不在重试失败后再上报告警?
+ //
重试过程有多个etcd请求,这些请求在很长时间后才会超时失败,影响异常状态的刷新,因此心跳失败后立即上报告警
+ alarm.Raise(alarm.IDScSelfHeartbeatFailed,
alarm.AdditionalContext("%v", err))
// 服务不存在,创建服务
err = selfRegister(ctx)
if err != nil {
log.Error(fmt.Sprintf("retry to
register[%s/%s/%s/%s] failed",
core.Service.Environment,
core.Service.AppId, core.Service.ServiceName, core.Service.Version), err)
}
+ alarm.Clear(alarm.IDScSelfHeartbeatFailed)
}
}
})