This is an automated email from the ASF dual-hosted git repository.

youling1128 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a07d7e6 Add readiness probe api (#1497)
4a07d7e6 is described below

commit 4a07d7e6e7264f811ca0ed4321a141296b37f1cb
Author: humingcheng <[email protected]>
AuthorDate: Sat Jan 18 15:13:07 2025 +0800

    Add readiness probe api (#1497)
---
 docs/openapi/v4.yaml                         | 26 ++++++++++++
 pkg/protect/checker.go                       | 42 ++++++++++++++++++++
 pkg/protect/checker_test.go                  | 15 +++++++
 pkg/protect/protect.go                       | 59 +++++++++++++++++++++-------
 pkg/protect/protect_test.go                  | 23 +++++------
 server/alarm/common.go                       |  1 +
 server/health/health.go                      | 23 ++++++++++-
 server/interceptor/interceptors_test.go      |  2 +-
 server/plugin/auth/buildin/buildin_test.go   |  3 +-
 server/resource/disco/instance_resource.go   |  9 ++---
 server/rest/controller/v4/main_controller.go | 12 ++++++
 server/service/rbac/rbac.go                  |  1 +
 server/service/registry/health.go            | 16 ++++++++
 server/service/registry/health_test.go       | 14 +++++++
 server/service/registry/registry.go          |  6 +++
 15 files changed, 217 insertions(+), 35 deletions(-)

diff --git a/docs/openapi/v4.yaml b/docs/openapi/v4.yaml
index 5fad67c4..d019ed6c 100644
--- a/docs/openapi/v4.yaml
+++ b/docs/openapi/v4.yaml
@@ -83,6 +83,32 @@ paths:
           description: 内部错误
           schema:
             $ref: '#/definitions/Error'
+  /v4/{project}/registry/health/readiness:
+    get:
+      description: |
+        服务中心readiness探测接口,确认服务中心是否可以接收请求。
+      parameters:
+        - name: x-domain-name
+          in: header
+          type: string
+          default: default
+        - name: project
+          in: path
+          required: true
+          type: string
+      tags:
+        - base
+      responses:
+        200:
+          description: 状态正常,可以接收请求
+        400:
+          description: 错误的请求
+          schema:
+            $ref: '#/definitions/Error'
+        500:
+          description: 内部错误
+          schema:
+            $ref: '#/definitions/Error'
   /v4/{project}/registry/microservices/{serviceId}:
     get:
       description: |
diff --git a/pkg/protect/checker.go b/pkg/protect/checker.go
new file mode 100644
index 00000000..fbbc85ec
--- /dev/null
+++ b/pkg/protect/checker.go
@@ -0,0 +1,42 @@
+package protect
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+// ProtectionChecker checks whether to do the protection on null instance
+type ProtectionChecker interface {
+       CheckProtection() bool
+}
+
+// DelayedStopProtectChecker means start the protection and close it after 
some time
+type DelayedStopProtectChecker struct {
+       delay       time.Duration
+       start       time.Time
+       logWhenStop sync.Once
+}
+
+func NewDelayedSuccessChecker(delay time.Duration) *DelayedStopProtectChecker {
+       return &DelayedStopProtectChecker{
+               delay:       delay,
+               start:       time.Now(),
+               logWhenStop: sync.Once{},
+       }
+}
+
+func (d *DelayedStopProtectChecker) CheckProtection() bool {
+       if time.Since(d.start) > d.delay {
+               d.logWhenStop.Do(func() {
+                       log.Info("null instance protection stopped")
+               })
+               return false
+       }
+       return true
+}
+
+func GetGlobalProtectionChecker() ProtectionChecker {
+       return globalProtectionChecker
+}
diff --git a/pkg/protect/checker_test.go b/pkg/protect/checker_test.go
new file mode 100644
index 00000000..f6c16b9b
--- /dev/null
+++ b/pkg/protect/checker_test.go
@@ -0,0 +1,15 @@
+package protect
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestDelayedStopProtectChecker_CheckProtection(t *testing.T) {
+       p := NewDelayedSuccessChecker(1 * time.Second)
+       assert.True(t, p.CheckProtection())
+       time.Sleep(p.delay)
+       assert.False(t, p.CheckProtection())
+}
diff --git a/pkg/protect/protect.go b/pkg/protect/protect.go
index 1649b3e4..a35b3842 100644
--- a/pkg/protect/protect.go
+++ b/pkg/protect/protect.go
@@ -1,13 +1,16 @@
 package protect
 
 import (
+       "context"
        "fmt"
        "net/http"
        "time"
 
-       "github.com/apache/servicecomb-service-center/server/config"
+       "github.com/go-chassis/foundation/gopool"
 
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/config"
+       "github.com/apache/servicecomb-service-center/server/service/registry"
 )
 
 /**
@@ -16,18 +19,18 @@ indicating that sdk not need to clear cache
 */
 
 var (
-       isWithinProtection        bool
-       startupTimestamp          int64
        enableInstanceNullProtect bool
        restartProtectInterval    time.Duration
        RestartProtectHttpCode    int
        validProtectCode          = map[int]struct{}{http.StatusNotModified: 
{}, http.StatusUnprocessableEntity: {}, http.StatusInternalServerError: {}}
+       globalProtectionChecker   ProtectionChecker
 )
 
 const (
        maxInterval                   = 60 * 60 * 24 * time.Second
        minInterval                   = 0 * time.Second
        defaultRestartProtectInterval = 120 * time.Second
+       defaultReadinessCheckInterval = 5 * time.Second // the null instance 
protection should start again when server is unready
 )
 
 func Init() {
@@ -50,24 +53,52 @@ func Init() {
        log.Info(fmt.Sprintf("instance_null_protect.enable: %t", 
enableInstanceNullProtect))
        log.Info(fmt.Sprintf("instance_null_protect.restart_protect_interval: 
%d", restartProtectInterval))
        log.Info(fmt.Sprintf("instance_null_protect.http_status: %d", 
RestartProtectHttpCode))
-       startupTimestamp = time.Now().UnixNano()
-       isWithinProtection = true
+
+       StartProtectionAndStopDelayed()
+       gopool.Go(watch)
+
 }
 
-func IsWithinRestartProtection() bool {
+func watch(ctx context.Context) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(defaultReadinessCheckInterval):
+                       err := registry.Readiness(ctx)
+                       if err != nil {
+                               AlwaysProtection()
+                               continue
+                       }
+                       StartProtectionAndStopDelayed()
+               }
+       }
+}
+
+func ShouldProtectOnNullInstance() bool {
        if !enableInstanceNullProtect {
                return false
        }
 
-       if !isWithinProtection {
-               return false
+       if globalProtectionChecker == nil { // protect by default
+               return true
        }
 
-       if time.Now().Add(-restartProtectInterval).UnixNano() > 
startupTimestamp {
-               log.Info("restart protection stop")
-               isWithinProtection = false
-               return false
+       return GetGlobalProtectionChecker().CheckProtection()
+}
+
+func StartProtectionAndStopDelayed() {
+       if _, ok := globalProtectionChecker.(*DelayedStopProtectChecker); ok {
+               return
+       }
+       globalProtectionChecker = 
NewDelayedSuccessChecker(restartProtectInterval)
+       log.Info("start protection and stop delayed on null instance")
+}
+
+func AlwaysProtection() {
+       if globalProtectionChecker == nil {
+               return
        }
-       log.Info("within restart protection")
-       return true
+       globalProtectionChecker = nil
+       log.Info("always protect on null instance")
 }
diff --git a/pkg/protect/protect_test.go b/pkg/protect/protect_test.go
index c7c2dd50..9c31cd81 100644
--- a/pkg/protect/protect_test.go
+++ b/pkg/protect/protect_test.go
@@ -12,19 +12,20 @@ func TestIsWithinRestartProtection(t *testing.T) {
 
        // protection switch off
        enableInstanceNullProtect = false
-       assert.False(t, IsWithinRestartProtection())
-       // within protection
+       assert.False(t, ShouldProtectOnNullInstance())
+
        enableInstanceNullProtect = true
-       isWithinProtection = true
-       startupTimestamp = time.Now().Add(-1 * time.Minute).UnixNano()
-       assert.True(t, IsWithinRestartProtection())
+       AlwaysProtection()
+       assert.True(t, ShouldProtectOnNullInstance())
 
        // protection delay exceed
-       enableInstanceNullProtect = true
-       isWithinProtection = true
-       startupTimestamp = time.Now().Add(-2 * time.Minute).Unix()
-       assert.False(t, IsWithinRestartProtection())
+       restartProtectInterval = 1 * time.Second
+       StartProtectionAndStopDelayed()
+       assert.True(t, ShouldProtectOnNullInstance())
+       time.Sleep(restartProtectInterval)
+       assert.False(t, ShouldProtectOnNullInstance())
 
-       // always false after exceed
-       assert.False(t, IsWithinRestartProtection())
+       // only the first takes effects
+       StartProtectionAndStopDelayed()
+       assert.False(t, ShouldProtectOnNullInstance())
 }
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 9da4a393..1825db7f 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -31,6 +31,7 @@ const (
 
 const (
        IDBackendConnectionRefuse model.ID = "BackendConnectionRefuse"
+       IDScSelfHeartbeatFailed   model.ID = "ScSelfHeartbeatFailed"
        IDInternalError           model.ID = "InternalError"
        IDIncrementPullError      model.ID = "IncrementPullError"
        IDWebsocketOfScSyncerLost model.ID = "WebsocketOfScSyncerLost"
diff --git a/server/health/health.go b/server/health/health.go
index f633027f..cbfb7c1a 100644
--- a/server/health/health.go
+++ b/server/health/health.go
@@ -23,18 +23,29 @@ import (
        "github.com/apache/servicecomb-service-center/server/alarm"
 )
 
-var healthChecker Checker = &DefaultHealthChecker{}
+var healthChecker Checker = &NullChecker{}
+var readinessChecker Checker = &DefaultHealthChecker{}
 
 type Checker interface {
        Healthy() error
 }
 
+type NullChecker struct {
+}
+
+func (n NullChecker) Healthy() error {
+       return nil
+}
+
 type DefaultHealthChecker struct {
 }
 
 func (hc *DefaultHealthChecker) Healthy() error {
        for _, a := range alarm.ListAll() {
-               if a.ID == alarm.IDBackendConnectionRefuse && a.Status != 
alarm.Cleared {
+               if a.Status == alarm.Cleared {
+                       continue
+               }
+               if a.ID == alarm.IDBackendConnectionRefuse || a.ID == 
alarm.IDScSelfHeartbeatFailed {
                        return 
errors.New(a.FieldString(alarm.FieldAdditionalContext))
                }
        }
@@ -48,3 +59,11 @@ func SetGlobalHealthChecker(hc Checker) {
 func GlobalHealthChecker() Checker {
        return healthChecker
 }
+
+func SetGlobalReadinessChecker(hc Checker) {
+       readinessChecker = hc
+}
+
+func GlobalReadinessChecker() Checker {
+       return readinessChecker
+}
diff --git a/server/interceptor/interceptors_test.go 
b/server/interceptor/interceptors_test.go
index 4a494206..1d7ba69f 100644
--- a/server/interceptor/interceptors_test.go
+++ b/server/interceptor/interceptors_test.go
@@ -30,7 +30,7 @@ func mockFunc(w http.ResponseWriter, r *http.Request) error {
        case 1:
                return errors.New("error")
        case 0:
-               panic(errors.New("panic"))
+               return errors.New("panic")
        default:
                i++
        }
diff --git a/server/plugin/auth/buildin/buildin_test.go 
b/server/plugin/auth/buildin/buildin_test.go
index fa2e146f..b16cf661 100644
--- a/server/plugin/auth/buildin/buildin_test.go
+++ b/server/plugin/auth/buildin/buildin_test.go
@@ -27,7 +27,6 @@ import (
        "testing"
        "time"
 
-       "github.com/form3tech-oss/jwt-go"
        "github.com/patrickmn/go-cache"
 
        _ "github.com/apache/servicecomb-service-center/test"
@@ -187,7 +186,7 @@ func TestSetTokenToCache(t *testing.T) {
        rawToken1 := "**1"
        rawToken2 := "**2"
        deleta, _ := time.ParseDuration("10m")
-       claims := &jwt.MapClaims{"exp": float64(time.Now().Add(deleta).Unix())}
+       claims := map[string]interface{}{"exp": 
float64(time.Now().Add(deleta).Unix())}
        t.Run("test Cache", func(t *testing.T) {
                buildin.SetTokenToCache(tokenCache1, rawToken1, claims)
                buildin.SetTokenToCache(tokenCache1, rawToken2, errors.New("bad 
token"))
diff --git a/server/resource/disco/instance_resource.go 
b/server/resource/disco/instance_resource.go
index df5fb145..ccb8fc69 100644
--- a/server/resource/disco/instance_resource.go
+++ b/server/resource/disco/instance_resource.go
@@ -19,17 +19,16 @@ package disco
 
 import (
        "fmt"
-       "github.com/apache/servicecomb-service-center/pkg/protect"
        "io"
        "net/http"
        "strings"
 
-       "github.com/go-chassis/go-chassis/v2/pkg/codec"
-
        pb "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/go-chassis/v2/pkg/codec"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/protect"
        "github.com/apache/servicecomb-service-center/pkg/rest"
        "github.com/apache/servicecomb-service-center/pkg/util"
        discosvc 
"github.com/apache/servicecomb-service-center/server/service/disco"
@@ -169,7 +168,7 @@ func (s *InstanceResource) FindInstances(w 
http.ResponseWriter, r *http.Request)
                w.WriteHeader(http.StatusNotModified)
                return
        }
-       if len(resp.Instances) == 0 && protect.IsWithinRestartProtection() {
+       if len(resp.Instances) == 0 && protect.ShouldProtectOnNullInstance() {
                w.WriteHeader(protect.RestartProtectHttpCode)
                return
        }
@@ -272,7 +271,7 @@ func (s *InstanceResource) ListInstance(w 
http.ResponseWriter, r *http.Request)
                w.WriteHeader(http.StatusNotModified)
                return
        }
-       if len(resp.Instances) == 0 && protect.IsWithinRestartProtection() {
+       if len(resp.Instances) == 0 && protect.ShouldProtectOnNullInstance() {
                w.WriteHeader(protect.RestartProtectHttpCode)
                return
        }
diff --git a/server/rest/controller/v4/main_controller.go 
b/server/rest/controller/v4/main_controller.go
index 5dd745fa..e563cfca 100644
--- a/server/rest/controller/v4/main_controller.go
+++ b/server/rest/controller/v4/main_controller.go
@@ -22,6 +22,8 @@ import (
        "net/http"
        "sync"
 
+       "github.com/apache/servicecomb-service-center/server/service/registry"
+
        discosvc 
"github.com/apache/servicecomb-service-center/server/service/disco"
 
        "github.com/apache/servicecomb-service-center/pkg/rest"
@@ -48,6 +50,7 @@ func (s *MainService) URLPatterns() []rest.Route {
        return []rest.Route{
                {Method: http.MethodGet, Path: "/v4/:project/registry/version", 
Func: s.GetVersion},
                {Method: http.MethodGet, Path: "/v4/:project/registry/health", 
Func: s.ClusterHealth},
+               {Method: http.MethodGet, Path: 
"/v4/:project/registry/health/readiness", Func: s.Readiness},
        }
 }
 
@@ -60,6 +63,15 @@ func (s *MainService) ClusterHealth(w http.ResponseWriter, r 
*http.Request) {
        rest.WriteResponse(w, r, nil, resp)
 }
 
+func (s *MainService) Readiness(w http.ResponseWriter, r *http.Request) {
+       err := registry.Readiness(r.Context())
+       if err != nil {
+               rest.WriteServiceError(w, err)
+               return
+       }
+       rest.WriteResponse(w, r, nil, nil)
+}
+
 func (s *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
        parseVersionOnce.Do(func() {
                result := Result{
diff --git a/server/service/rbac/rbac.go b/server/service/rbac/rbac.go
index 3c3ff5d1..aa666c92 100644
--- a/server/service/rbac/rbac.go
+++ b/server/service/rbac/rbac.go
@@ -74,6 +74,7 @@ func add2WhiteAPIList() {
        rbac.Add2WhiteAPIList(APITokenGranter)
        rbac.Add2WhiteAPIList("/v4/:project/registry/version", "/version")
        rbac.Add2WhiteAPIList("/v4/:project/registry/health", "/health")
+       rbac.Add2WhiteAPIList("/v4/:project/registry/health/readiness")
 
        // user can list self permission without account get permission
        Add2CheckPermWhiteAPIList(APISelfPerms)
diff --git a/server/service/registry/health.go 
b/server/service/registry/health.go
new file mode 100644
index 00000000..b9ccf836
--- /dev/null
+++ b/server/service/registry/health.go
@@ -0,0 +1,16 @@
+package registry
+
+import (
+       "context"
+
+       pb "github.com/go-chassis/cari/discovery"
+
+       "github.com/apache/servicecomb-service-center/server/health"
+)
+
+func Readiness(_ context.Context) error {
+       if err := health.GlobalReadinessChecker().Healthy(); err != nil {
+               return pb.NewError(pb.ErrUnhealthy, err.Error())
+       }
+       return nil
+}
diff --git a/server/service/registry/health_test.go 
b/server/service/registry/health_test.go
new file mode 100644
index 00000000..68896874
--- /dev/null
+++ b/server/service/registry/health_test.go
@@ -0,0 +1,14 @@
+package registry
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/servicecomb-service-center/server/health"
+)
+
+func TestReadiness(t *testing.T) {
+       health.SetGlobalReadinessChecker(&health.NullChecker{})
+       assert.NoError(t, Readiness(nil))
+}
diff --git a/server/service/registry/registry.go 
b/server/service/registry/registry.go
index 65211d5b..991d8367 100644
--- a/server/service/registry/registry.go
+++ b/server/service/registry/registry.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/alarm"
        "github.com/apache/servicecomb-service-center/server/core"
        discosvc 
"github.com/apache/servicecomb-service-center/server/service/disco"
        "github.com/apache/servicecomb-service-center/server/service/sync"
@@ -132,14 +133,19 @@ func autoSelfHeartBeat() {
                        case 
<-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
                                err := selfHeartBeat(ctx)
                                if err == nil {
+                                       
alarm.Clear(alarm.IDScSelfHeartbeatFailed)
                                        continue
                                }
+                               // 为什么不在重试失败后再上报告警?
+                               // 
重试过程有多个etcd请求,这些请求在很长时间后才会超时失败,影响异常状态的刷新,因此心跳失败后立即上报告警
+                               alarm.Raise(alarm.IDScSelfHeartbeatFailed, 
alarm.AdditionalContext("%v", err))
                                // 服务不存在,创建服务
                                err = selfRegister(ctx)
                                if err != nil {
                                        log.Error(fmt.Sprintf("retry to 
register[%s/%s/%s/%s] failed",
                                                core.Service.Environment, 
core.Service.AppId, core.Service.ServiceName, core.Service.Version), err)
                                }
+                               alarm.Clear(alarm.IDScSelfHeartbeatFailed)
                        }
                }
        })

Reply via email to