This is an automated email from the ASF dual-hosted git repository.
zhangning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new b266ff35e feat: add ready and health probes (#7195)
b266ff35e is described below
commit b266ff35ede95b81373c0babc1cdfe7a06f91d4b
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Mar 20 18:19:10 2024 +0800
feat: add ready and health probes (#7195)
---
backend/core/errors/types.go | 5 ++--
backend/server/api/api.go | 3 +-
backend/server/api/ping/ping.go | 34 +++++++++++++++++++++++
backend/server/services/init.go | 11 ++++++++
backend/server/services/probes.go | 58 +++++++++++++++++++++++++++++++++++++++
5 files changed, 108 insertions(+), 3 deletions(-)
diff --git a/backend/core/errors/types.go b/backend/core/errors/types.go
index a4af60ef2..c82a913c4 100644
--- a/backend/core/errors/types.go
+++ b/backend/core/errors/types.go
@@ -36,8 +36,9 @@ var (
Conflict = register(&Type{httpCode: http.StatusConflict, meta:
"internal"})
//500+
- Internal = register(&Type{httpCode: http.StatusInternalServerError,
meta: "internal"})
- Timeout = register(&Type{httpCode: http.StatusGatewayTimeout, meta:
"timeout"})
+ Internal = register(&Type{httpCode: http.StatusInternalServerError,
meta: "internal"})
+ Timeout = register(&Type{httpCode: http.StatusGatewayTimeout, meta:
"timeout"})
+ Unavailable = register(&Type{httpCode: http.StatusServiceUnavailable,
meta: "unavailable"})
//cached values
typesByHttpCode = newSyncMap[int, *Type]()
diff --git a/backend/server/api/api.go b/backend/server/api/api.go
index e04e9e678..7f17b5dc7 100644
--- a/backend/server/api/api.go
+++ b/backend/server/api/api.go
@@ -78,7 +78,8 @@ func CreateApiServer() *gin.Engine {
// For both protected and unprotected routes
router.GET("/ping", ping.Get)
- router.GET("/health", ping.Get)
+ router.GET("/ready", ping.Ready)
+ router.GET("/health", ping.Health)
router.GET("/version", version.Get)
// Api keys
diff --git a/backend/server/api/ping/ping.go b/backend/server/api/ping/ping.go
index 0ff0b5142..4da7652a9 100644
--- a/backend/server/api/ping/ping.go
+++ b/backend/server/api/ping/ping.go
@@ -20,6 +20,8 @@ package ping
import (
"net/http"
+ "github.com/apache/incubator-devlake/server/api/shared"
+ "github.com/apache/incubator-devlake/server/services"
"github.com/gin-gonic/gin"
)
@@ -33,3 +35,35 @@ import (
func Get(c *gin.Context) {
c.Status(http.StatusOK)
}
+
+// @Summary Ready
+// @Description check if service is ready
+// @Tags framework/ping
+// @Success 200
+// @Failure 400 {string} errcode.Error "Bad Request"
+// @Failure 500 {string} errcode.Error "Internal Error"
+// @Router /ready [get]
+func Ready(c *gin.Context) {
+ status, err := services.Ready()
+ if err != nil {
+ shared.ApiOutputError(c, err)
+ return
+ }
+ shared.ApiOutputSuccess(c, shared.ApiBody{Success: true, Message:
status}, http.StatusOK)
+}
+
+// @Summary Health
+// @Description check if service is health
+// @Tags framework/ping
+// @Success 200
+// @Failure 400 {string} errcode.Error "Bad Request"
+// @Failure 500 {string} errcode.Error "Internal Error"
+// @Router /health [get]
+func Health(c *gin.Context) {
+ msg, err := services.Health()
+ if err != nil {
+ shared.ApiOutputError(c, err)
+ return
+ }
+ shared.ApiOutputSuccess(c, shared.ApiBody{Success: true, Message: msg},
http.StatusOK)
+}
diff --git a/backend/server/services/init.go b/backend/server/services/init.go
index d332c181a..2d5dad010 100644
--- a/backend/server/services/init.go
+++ b/backend/server/services/init.go
@@ -42,7 +42,14 @@ var basicRes context.BasicRes
var migrator plugin.Migrator
var cronManager *cron.Cron
var vld *validator.Validate
+var serviceStatus string
+const (
+ SERVICE_STATUS_INIT = "initializing"
+ SERVICE_STATUS_WAIT_CONFIRM = "waiting for migration confirmation"
+ SERVICE_STATUS_MIGRATING = "migrating"
+ SERVICE_STATUS_READY = "ready"
+)
const failToCreateCronJob = "created cron job failed"
// InitResources creates resources needed by services module
@@ -50,6 +57,7 @@ func InitResources() {
var err error
// basic resources initialization
+ serviceStatus = SERVICE_STATUS_INIT
vld = validator.New()
basicRes = runner.CreateAppBasicRes()
cfg = basicRes.GetConfigReader()
@@ -98,6 +106,7 @@ func Init() {
errors.Must(ExecuteMigration())
logger.Info("db migration without confirmation")
} else {
+ serviceStatus = SERVICE_STATUS_WAIT_CONFIRM
logger.Info("db migration confirmation needed")
}
} else {
@@ -108,6 +117,7 @@ func Init() {
// ExecuteMigration executes all pending migration scripts and initialize
services module
func ExecuteMigration() errors.Error {
+ serviceStatus = SERVICE_STATUS_MIGRATING
// apply all pending migration scripts
err := migrator.Execute()
if err != nil {
@@ -120,6 +130,7 @@ func ExecuteMigration() errors.Error {
// initialize pipeline server, mainly to start the pipeline consuming
process
pipelineServiceInit()
+ serviceStatus = SERVICE_STATUS_READY
return nil
}
diff --git a/backend/server/services/probes.go
b/backend/server/services/probes.go
new file mode 100644
index 000000000..c799bc8c1
--- /dev/null
+++ b/backend/server/services/probes.go
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "time"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models"
+)
+
+// Ready returns the readiness status of the service
+func Ready() (string, errors.Error) {
+ var err errors.Error
+ if serviceStatus != SERVICE_STATUS_READY {
+ err = errors.Unavailable.New("service is not ready: " +
serviceStatus)
+ }
+ return serviceStatus, err
+}
+
+// Health returns the health status of the service
+func Health() (string, errors.Error) {
+ // return true, nil unless we are 100% sure that the service is
unhealthy
+ if serviceStatus != SERVICE_STATUS_READY {
+ return "maybe", nil
+ }
+ // cover the cases #5711, #6685 that we ran into in the pass
+ // it is healthy if we could read one record from the pipelines table
in 5 seconds
+ result := make(chan errors.Error, 1)
+ go func() {
+ result <- db.All(&models.Pipeline{}, dal.Limit(1))
+ }()
+ select {
+ case <-time.After(5 * time.Second):
+ return "timeouted", errors.Default.New("timeout reading from
pipelines")
+ case err := <-result:
+ if err != nil {
+ return "bad", err
+ }
+ return "good", nil
+ }
+}