This is an automated email from the ASF dual-hosted git repository.

humingcheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/dev by this push:
     new 47b2e16b Feature: add sync readiness checker (#1499)
47b2e16b is described below

commit 47b2e16b87c5f3d929071b4afecd542d7d975f13
Author: holden-cpu <[email protected]>
AuthorDate: Thu Feb 20 19:26:27 2025 +0800

    Feature: add sync readiness checker (#1499)
    
    * Feature: add sync readiness checker
    
    ---------
    
    Co-authored-by: sunhaidong <[email protected]>
---
 server/health/health.go      | 54 ++++++++++++++++++++++++++++++++++++++++++--
 server/health/health_test.go | 40 ++++++++++++++++++++++++++++++++
 server/server.go             | 24 +++++++++++++++++++-
 syncer/rpc/server.go         | 18 +++++++++++++++
 syncer/service/sync/sync.go  |  6 +++++
 5 files changed, 139 insertions(+), 3 deletions(-)

diff --git a/server/health/health.go b/server/health/health.go
index cbfb7c1a..b5cb034e 100644
--- a/server/health/health.go
+++ b/server/health/health.go
@@ -19,12 +19,25 @@ package health
 
 import (
        "errors"
+       "fmt"
+       "time"
 
+       "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/server/alarm"
+       "github.com/apache/servicecomb-service-center/syncer/config"
+       "github.com/apache/servicecomb-service-center/syncer/rpc"
 )
 
-var healthChecker Checker = &NullChecker{}
-var readinessChecker Checker = &DefaultHealthChecker{}
+var (
+       healthChecker        Checker = &NullChecker{}
+       readinessChecker     Checker = &DefaultHealthChecker{}
+       syncReadinessChecker Checker = &SyncReadinessChecker{}
+)
+
+var (
+       scNotReadyError     = errors.New("sc api server is not ready")
+       syncerNotReadyError = errors.New("the syncer module is not ready")
+)
 
 type Checker interface {
        Healthy() error
@@ -40,7 +53,41 @@ func (n NullChecker) Healthy() error {
 type DefaultHealthChecker struct {
 }
 
+type SyncReadinessChecker struct {
+       startupTime time.Time
+}
+
+func SetStartupTime(startupTime time.Time) {
+       syncReadinessChecker.(*SyncReadinessChecker).startupTime = startupTime
+}
+
+func (src *SyncReadinessChecker) Healthy() error {
+       err := defaultHealth()
+       if err != nil {
+               return err
+       }
+       if src.startupTime.IsZero() {
+               return scNotReadyError
+       }
+       passTime := src.startupTime.Add(30 * time.Second)
+       if !rpc.IsNotReceiveSyncRequest() && 
rpc.GetFirstReceiveTime().Sub(src.startupTime) < 30*time.Second {
+               passTime = 
passTime.Add(rpc.GetFirstReceiveTime().Sub(src.startupTime))
+       } else {
+               log.Warn(fmt.Sprintf("first sync request is not received or 
received 30 seconds after startup,%s,%s", rpc.GetFirstReceiveTime(), 
src.startupTime))
+               passTime = passTime.Add(30 * time.Second)
+       }
+       nowTime := time.Now()
+       if nowTime.After(passTime) {
+               return nil
+       }
+       return syncerNotReadyError
+}
+
 func (hc *DefaultHealthChecker) Healthy() error {
+       return defaultHealth()
+}
+
+func defaultHealth() error {
        for _, a := range alarm.ListAll() {
                if a.Status == alarm.Cleared {
                        continue
@@ -65,5 +112,8 @@ func SetGlobalReadinessChecker(hc Checker) {
 }
 
 func GlobalReadinessChecker() Checker {
+       if config.GetConfig().Sync.EnableOnStart {
+               return syncReadinessChecker
+       }
        return readinessChecker
 }
diff --git a/server/health/health_test.go b/server/health/health_test.go
index 282a0f0a..3848a5b5 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -21,8 +21,11 @@ import (
        "testing"
        "time"
 
+       "github.com/stretchr/testify/assert"
+
        "github.com/apache/servicecomb-service-center/server/alarm"
        "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/apache/servicecomb-service-center/syncer/rpc"
 )
 
 func TestDefaultHealthChecker_Healthy(t *testing.T) {
@@ -55,3 +58,40 @@ func TestDefaultHealthChecker_Healthy(t *testing.T) {
                t.Fatal("TestDefaultHealthChecker_Healthy failed")
        }
 }
+
+func TestHealthy(t *testing.T) {
+       now := time.Now()
+       t.Run("sync_not_start", func(t *testing.T) {
+               err := syncReadinessChecker.Healthy()
+               assert.ErrorIs(t, err, scNotReadyError)
+       })
+
+       t.Run("no_sync", func(t *testing.T) {
+               // 未接受到同步请求,并且nowTime.Before(passTime)
+               src := &SyncReadinessChecker{startupTime: now}
+               err := src.Healthy()
+               assert.ErrorIs(t, err, syncerNotReadyError)
+       })
+
+       t.Run("no_sync_but_exceeds_60s", func(t *testing.T) {
+               // 未接受到同步请求,但是超出最大等待时间
+               src := &SyncReadinessChecker{startupTime: now.Add(-60 * 
time.Second)}
+               err := src.Healthy()
+               assert.Nil(t, err)
+       })
+
+       t.Run("sync_and_before", func(t *testing.T) {
+               // 30s内接受到第一次sync请求,并且nowTime.Before(passTime)
+               src := &SyncReadinessChecker{startupTime: now}
+               err := src.Healthy()
+               rpc.RecordFirstReceivedRequestTime()
+               assert.ErrorIs(t, err, syncerNotReadyError)
+       })
+
+       t.Run("31s_sync_and_before", func(t *testing.T) {
+               // 30s后接受到第一次sync请求,并且nowTime.Before(passTime)
+               src := &SyncReadinessChecker{startupTime: now.Add(-31 * 
time.Second)}
+               err := src.Healthy()
+               assert.ErrorIs(t, err, syncerNotReadyError)
+       })
+}
diff --git a/server/server.go b/server/server.go
index b05be2f3..45de81fd 100644
--- a/server/server.go
+++ b/server/server.go
@@ -20,8 +20,11 @@ package server
 import (
        "context"
        "crypto/tls"
-       "github.com/apache/servicecomb-service-center/pkg/protect"
        "os"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/protect"
+       "github.com/apache/servicecomb-service-center/server/health"
 
        "github.com/gofiber/fiber/v2"
 
@@ -47,6 +50,11 @@ import (
        "github.com/apache/servicecomb-service-center/server/service/rbac"
 )
 
+const (
+       apiServerStartCheckInterval = 1 * time.Second
+       apiServerStartCheckTimes    = 120 // 共检查2分钟
+)
+
 var sc ServiceCenterServer
 
 func Run() {
@@ -76,9 +84,23 @@ func (s *ServiceCenterServer) Run() {
 
        signal.RegisterListener()
 
+       go initScStartupTime()
+
        s.waitForQuit()
 }
 
+func initScStartupTime() {
+       i := 1
+       for ; i <= apiServerStartCheckTimes; i++ {
+               time.Sleep(apiServerStartCheckInterval)
+               // 等待sc api server初始化完成
+               if !GetAPIServer().IsClose() {
+                       health.SetStartupTime(time.Now())
+                       break
+               }
+       }
+}
+
 func (s *ServiceCenterServer) startChassis() {
        go func() {
                mask := make([]string, 0)
diff --git a/syncer/rpc/server.go b/syncer/rpc/server.go
index 83fc8f39..fa297819 100644
--- a/syncer/rpc/server.go
+++ b/syncer/rpc/server.go
@@ -40,6 +40,8 @@ const (
        RbacAllowedRoleName    = "sync-admin"
 )
 
+var firstReceiveTime time.Time
+
 func NewServer() *Server {
        return &Server{
                replicator: replicator.Manager(),
@@ -53,6 +55,7 @@ type Server struct {
 }
 
 func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) 
(*v1sync.Results, error) {
+       RecordFirstReceivedRequestTime()
        err := auth(ctx)
        if err != nil {
                log.Error("auth failed", err)
@@ -66,6 +69,21 @@ func (s *Server) Sync(ctx context.Context, events 
*v1sync.EventList) (*v1sync.Re
        return s.toResults(res), nil
 }
 
+func RecordFirstReceivedRequestTime() {
+       if IsNotReceiveSyncRequest() {
+               firstReceiveTime = time.Now()
+               log.Info(fmt.Sprintf("receive first received request time: %s", 
firstReceiveTime))
+       }
+}
+
+func IsNotReceiveSyncRequest() bool {
+       return firstReceiveTime.IsZero()
+}
+
+func GetFirstReceiveTime() time.Time {
+       return firstReceiveTime
+}
+
 func generateFailedResults(events *v1sync.EventList, err error) 
(*v1sync.Results, error) {
        if events == nil || len(events.Events) == 0 {
                return &v1sync.Results{Results: map[string]*v1sync.Result{}}, 
nil
diff --git a/syncer/service/sync/sync.go b/syncer/service/sync/sync.go
index ed795097..0ed5662b 100644
--- a/syncer/service/sync/sync.go
+++ b/syncer/service/sync/sync.go
@@ -19,6 +19,7 @@ package sync
 
 import (
        "fmt"
+       "time"
 
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/syncer/service/event"
@@ -26,6 +27,11 @@ import (
        "github.com/apache/servicecomb-service-center/syncer/service/task"
 )
 
+const (
+       apiServerStartCheckInterval = 1 * time.Second
+       apiServerStartCheckTimes    = 120 // 共检查2分钟
+)
+
 func Init() {
        err := replicator.Work()
        if err != nil {

Reply via email to