This is an automated email from the ASF dual-hosted git repository.
humingcheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/dev by this push:
new 47b2e16b Feature: add sync readiness checker (#1499)
47b2e16b is described below
commit 47b2e16b87c5f3d929071b4afecd542d7d975f13
Author: holden-cpu <[email protected]>
AuthorDate: Thu Feb 20 19:26:27 2025 +0800
Feature: add sync readiness checker (#1499)
* Feature: add sync readiness checker
---------
Co-authored-by: sunhaidong <[email protected]>
---
server/health/health.go | 54 ++++++++++++++++++++++++++++++++++++++++++--
server/health/health_test.go | 40 ++++++++++++++++++++++++++++++++
server/server.go | 24 +++++++++++++++++++-
syncer/rpc/server.go | 18 +++++++++++++++
syncer/service/sync/sync.go | 6 +++++
5 files changed, 139 insertions(+), 3 deletions(-)
diff --git a/server/health/health.go b/server/health/health.go
index cbfb7c1a..b5cb034e 100644
--- a/server/health/health.go
+++ b/server/health/health.go
@@ -19,12 +19,25 @@ package health
import (
"errors"
+ "fmt"
+ "time"
+ "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/alarm"
+ "github.com/apache/servicecomb-service-center/syncer/config"
+ "github.com/apache/servicecomb-service-center/syncer/rpc"
)
-var healthChecker Checker = &NullChecker{}
-var readinessChecker Checker = &DefaultHealthChecker{}
+var (
+ healthChecker Checker = &NullChecker{}
+ readinessChecker Checker = &DefaultHealthChecker{}
+ syncReadinessChecker Checker = &SyncReadinessChecker{}
+)
+
+var (
+ scNotReadyError = errors.New("sc api server is not ready")
+ syncerNotReadyError = errors.New("the syncer module is not ready")
+)
type Checker interface {
Healthy() error
@@ -40,7 +53,41 @@ func (n NullChecker) Healthy() error {
type DefaultHealthChecker struct {
}
+type SyncReadinessChecker struct {
+ startupTime time.Time
+}
+
+func SetStartupTime(startupTime time.Time) {
+ syncReadinessChecker.(*SyncReadinessChecker).startupTime = startupTime
+}
+
+func (src *SyncReadinessChecker) Healthy() error {
+ err := defaultHealth()
+ if err != nil {
+ return err
+ }
+ if src.startupTime.IsZero() {
+ return scNotReadyError
+ }
+ passTime := src.startupTime.Add(30 * time.Second)
+ if !rpc.IsNotReceiveSyncRequest() &&
rpc.GetFirstReceiveTime().Sub(src.startupTime) < 30*time.Second {
+ passTime =
passTime.Add(rpc.GetFirstReceiveTime().Sub(src.startupTime))
+ } else {
+ log.Warn(fmt.Sprintf("first sync request is not received or
received 30 seconds after startup,%s,%s", rpc.GetFirstReceiveTime(),
src.startupTime))
+ passTime = passTime.Add(30 * time.Second)
+ }
+ nowTime := time.Now()
+ if nowTime.After(passTime) {
+ return nil
+ }
+ return syncerNotReadyError
+}
+
func (hc *DefaultHealthChecker) Healthy() error {
+ return defaultHealth()
+}
+
+func defaultHealth() error {
for _, a := range alarm.ListAll() {
if a.Status == alarm.Cleared {
continue
@@ -65,5 +112,8 @@ func SetGlobalReadinessChecker(hc Checker) {
}
func GlobalReadinessChecker() Checker {
+ if config.GetConfig().Sync.EnableOnStart {
+ return syncReadinessChecker
+ }
return readinessChecker
}
diff --git a/server/health/health_test.go b/server/health/health_test.go
index 282a0f0a..3848a5b5 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -21,8 +21,11 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/assert"
+
"github.com/apache/servicecomb-service-center/server/alarm"
"github.com/apache/servicecomb-service-center/server/event"
+ "github.com/apache/servicecomb-service-center/syncer/rpc"
)
func TestDefaultHealthChecker_Healthy(t *testing.T) {
@@ -55,3 +58,40 @@ func TestDefaultHealthChecker_Healthy(t *testing.T) {
t.Fatal("TestDefaultHealthChecker_Healthy failed")
}
}
+
+func TestHealthy(t *testing.T) {
+ now := time.Now()
+ t.Run("sync_not_start", func(t *testing.T) {
+ err := syncReadinessChecker.Healthy()
+ assert.ErrorIs(t, err, scNotReadyError)
+ })
+
+ t.Run("no_sync", func(t *testing.T) {
+ // 未接受到同步请求,并且nowTime.Before(passTime)
+ src := &SyncReadinessChecker{startupTime: now}
+ err := src.Healthy()
+ assert.ErrorIs(t, err, syncerNotReadyError)
+ })
+
+ t.Run("no_sync_but_exceeds_60s", func(t *testing.T) {
+ // 未接受到同步请求,但是超出最大等待时间
+ src := &SyncReadinessChecker{startupTime: now.Add(-60 *
time.Second)}
+ err := src.Healthy()
+ assert.Nil(t, err)
+ })
+
+ t.Run("sync_and_before", func(t *testing.T) {
+ // 30s内接受到第一次sync请求,并且nowTime.Before(passTime)
+ src := &SyncReadinessChecker{startupTime: now}
+ err := src.Healthy()
+ rpc.RecordFirstReceivedRequestTime()
+ assert.ErrorIs(t, err, syncerNotReadyError)
+ })
+
+ t.Run("31s_sync_and_before", func(t *testing.T) {
+ // 30s后接受到第一次sync请求,并且nowTime.Before(passTime)
+ src := &SyncReadinessChecker{startupTime: now.Add(-31 *
time.Second)}
+ err := src.Healthy()
+ assert.ErrorIs(t, err, syncerNotReadyError)
+ })
+}
diff --git a/server/server.go b/server/server.go
index b05be2f3..45de81fd 100644
--- a/server/server.go
+++ b/server/server.go
@@ -20,8 +20,11 @@ package server
import (
"context"
"crypto/tls"
- "github.com/apache/servicecomb-service-center/pkg/protect"
"os"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/protect"
+ "github.com/apache/servicecomb-service-center/server/health"
"github.com/gofiber/fiber/v2"
@@ -47,6 +50,11 @@ import (
"github.com/apache/servicecomb-service-center/server/service/rbac"
)
+const (
+ apiServerStartCheckInterval = 1 * time.Second
+ apiServerStartCheckTimes = 120 // 共检查2分钟
+)
+
var sc ServiceCenterServer
func Run() {
@@ -76,9 +84,23 @@ func (s *ServiceCenterServer) Run() {
signal.RegisterListener()
+ go initScStartupTime()
+
s.waitForQuit()
}
+func initScStartupTime() {
+ i := 1
+ for ; i <= apiServerStartCheckTimes; i++ {
+ time.Sleep(apiServerStartCheckInterval)
+ // 等待sc api server初始化完成
+ if !GetAPIServer().IsClose() {
+ health.SetStartupTime(time.Now())
+ break
+ }
+ }
+}
+
func (s *ServiceCenterServer) startChassis() {
go func() {
mask := make([]string, 0)
diff --git a/syncer/rpc/server.go b/syncer/rpc/server.go
index 83fc8f39..fa297819 100644
--- a/syncer/rpc/server.go
+++ b/syncer/rpc/server.go
@@ -40,6 +40,8 @@ const (
RbacAllowedRoleName = "sync-admin"
)
+var firstReceiveTime time.Time
+
func NewServer() *Server {
return &Server{
replicator: replicator.Manager(),
@@ -53,6 +55,7 @@ type Server struct {
}
func (s *Server) Sync(ctx context.Context, events *v1sync.EventList)
(*v1sync.Results, error) {
+ RecordFirstReceivedRequestTime()
err := auth(ctx)
if err != nil {
log.Error("auth failed", err)
@@ -66,6 +69,21 @@ func (s *Server) Sync(ctx context.Context, events
*v1sync.EventList) (*v1sync.Re
return s.toResults(res), nil
}
+func RecordFirstReceivedRequestTime() {
+ if IsNotReceiveSyncRequest() {
+ firstReceiveTime = time.Now()
+ log.Info(fmt.Sprintf("receive first received request time: %s",
firstReceiveTime))
+ }
+}
+
+func IsNotReceiveSyncRequest() bool {
+ return firstReceiveTime.IsZero()
+}
+
+func GetFirstReceiveTime() time.Time {
+ return firstReceiveTime
+}
+
func generateFailedResults(events *v1sync.EventList, err error)
(*v1sync.Results, error) {
if events == nil || len(events.Events) == 0 {
return &v1sync.Results{Results: map[string]*v1sync.Result{}},
nil
diff --git a/syncer/service/sync/sync.go b/syncer/service/sync/sync.go
index ed795097..0ed5662b 100644
--- a/syncer/service/sync/sync.go
+++ b/syncer/service/sync/sync.go
@@ -19,6 +19,7 @@ package sync
import (
"fmt"
+ "time"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/service/event"
@@ -26,6 +27,11 @@ import (
"github.com/apache/servicecomb-service-center/syncer/service/task"
)
+const (
+ apiServerStartCheckInterval = 1 * time.Second
+ apiServerStartCheckTimes = 120 // 共检查2分钟
+)
+
func Init() {
err := replicator.Work()
if err != nil {