This is an automated email from the ASF dual-hosted git repository.
lynwee pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v1.0 by this push:
new a2fffb995 cherry pick feat(framework): add custom pipeline
notification service #7920 to v1.0 (#7924)
a2fffb995 is described below
commit a2fffb99557430afe9155e868294b842ee1aa757
Author: Lynwee <[email protected]>
AuthorDate: Tue Aug 20 17:12:32 2024 +0800
cherry pick feat(framework): add custom pipeline notification service #7920
to v1.0 (#7924)
* feat(framework): add custom pipeline notification service
* fix(e2e): fix errors
---
backend/server/api/api.go | 9 ++++
backend/server/services/init.go | 8 ++++
backend/server/services/pipeline.go | 48 +++++++++++++++------
backend/server/services/pipeline_notification.go | 49 ++++++++++++++++++++++
...tification.go => pipeline_notification_impl.go} | 34 +++++----------
5 files changed, 113 insertions(+), 35 deletions(-)
diff --git a/backend/server/api/api.go b/backend/server/api/api.go
index 7c0266b61..95a485713 100644
--- a/backend/server/api/api.go
+++ b/backend/server/api/api.go
@@ -59,6 +59,15 @@ func Init() {
basicRes = services.GetBasicRes()
}
+func InjectCustomService(pipelineNotifier
services.PipelineNotificationService) errors.Error {
+ if pipelineNotifier != nil {
+ if err := services.InjectCustomService(pipelineNotifier); err
!= nil {
+ return err
+ }
+ }
+ return nil
+}
+
// @title DevLake Swagger API
// @version 0.1
// @description <h2>This is the main page of devlake api</h2>
diff --git a/backend/server/services/init.go b/backend/server/services/init.go
index e75af65bf..df6308fad 100644
--- a/backend/server/services/init.go
+++ b/backend/server/services/init.go
@@ -125,6 +125,14 @@ func Init() {
registerPluginsMigrationScripts()
}
+func InjectCustomService(pipelineNotifier PipelineNotificationService)
errors.Error {
+ if pipelineNotifier == nil {
+ return errors.Default.New("pipeline notifier is nil")
+ }
+ customPipelineNotificationService = pipelineNotifier
+ return nil
+}
+
var statusLock sync.Mutex
// ExecuteMigration executes all pending migration scripts and initialize
services module
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 15d930205..f0c7f3133 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -41,7 +41,7 @@ import (
"golang.org/x/sync/semaphore"
)
-var notificationService *NotificationService
+var defaultNotificationService *DefaultPipelineNotificationService
var globalPipelineLog = logruslog.Global.Nested("pipeline service")
var pluginOptionSanitizers = map[string]func(map[string]interface{}){
"gitextractor": func(options map[string]interface{}) {
@@ -85,7 +85,7 @@ func pipelineServiceInit() {
var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
if strings.TrimSpace(notificationEndpoint) != "" {
- notificationService =
NewNotificationService(notificationEndpoint, notificationSecret)
+ defaultNotificationService =
NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
}
// standalone mode: reset pipeline status
@@ -226,8 +226,10 @@ func GetPipeline(pipelineId uint64, shouldSanitize bool)
(*models.Pipeline, erro
if err != nil {
return nil, err
}
- if err := SanitizePipeline(dbPipeline); err != nil {
- return nil, errors.Convert(err)
+ if shouldSanitize {
+ if err := SanitizePipeline(dbPipeline); err != nil {
+ return nil, errors.Convert(err)
+ }
}
return dbPipeline, nil
}
@@ -352,9 +354,26 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
}
}
+func getProjectName(pipeline *models.Pipeline) (string, errors.Error) {
+ if pipeline == nil {
+ return "", errors.Default.New("pipeline is nil")
+ }
+ blueprintId := pipeline.BlueprintId
+ dbBlueprint := &models.Blueprint{}
+ err := db.First(dbBlueprint, dal.Where("id = ?", blueprintId))
+ if err != nil {
+ if db.IsErrorNotFound(err) {
+ return "",
errors.NotFound.New(fmt.Sprintf("blueprint(id: %d) not found", blueprintId))
+ }
+ return "", errors.Internal.Wrap(err, "error getting the
blueprint from database")
+ }
+ return dbBlueprint.ProjectName, nil
+}
+
// NotifyExternal FIXME ...
func NotifyExternal(pipelineId uint64) errors.Error {
- if notificationService == nil {
+ notification := GetPipelineNotificationService()
+ if notification == nil {
return nil
}
// send notification to an external web endpoint
@@ -362,13 +381,18 @@ func NotifyExternal(pipelineId uint64) errors.Error {
if err != nil {
return err
}
- err = notificationService.PipelineStatusChanged(PipelineNotification{
- PipelineID: pipeline.ID,
- CreatedAt: pipeline.CreatedAt,
- UpdatedAt: pipeline.UpdatedAt,
- BeganAt: pipeline.BeganAt,
- FinishedAt: pipeline.FinishedAt,
- Status: pipeline.Status,
+ projectName, err := getProjectName(pipeline)
+ if err != nil {
+ return err
+ }
+ err = notification.PipelineStatusChanged(PipelineNotificationParam{
+ ProjectName: projectName,
+ PipelineID: pipeline.ID,
+ CreatedAt: pipeline.CreatedAt,
+ UpdatedAt: pipeline.UpdatedAt,
+ BeganAt: pipeline.BeganAt,
+ FinishedAt: pipeline.FinishedAt,
+ Status: pipeline.Status,
})
if err != nil {
globalPipelineLog.Error(err, "failed to send notification: %v",
err)
diff --git a/backend/server/services/pipeline_notification.go
b/backend/server/services/pipeline_notification.go
new file mode 100644
index 000000000..2c25c44ce
--- /dev/null
+++ b/backend/server/services/pipeline_notification.go
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "github.com/apache/incubator-devlake/core/errors"
+ "time"
+)
+
+type PipelineNotificationParam struct {
+ ProjectName string
+ PipelineID uint64
+ CreatedAt time.Time
+ UpdatedAt time.Time
+ BeganAt *time.Time
+ FinishedAt *time.Time
+ Status string
+}
+
+type PipelineNotificationService interface {
+ PipelineStatusChanged(params PipelineNotificationParam) errors.Error
+}
+
+var customPipelineNotificationService PipelineNotificationService
+
+func GetPipelineNotificationService() PipelineNotificationService {
+ if customPipelineNotificationService != nil {
+ return customPipelineNotificationService
+ }
+ if defaultNotificationService != nil {
+ return defaultNotificationService
+ }
+ return nil
+}
diff --git a/backend/server/services/notification.go
b/backend/server/services/pipeline_notification_impl.go
similarity index 76%
rename from backend/server/services/notification.go
rename to backend/server/services/pipeline_notification_impl.go
index 854a78e53..2a4a096a5 100644
--- a/backend/server/services/notification.go
+++ b/backend/server/services/pipeline_notification_impl.go
@@ -22,46 +22,34 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
- "io"
- "net/http"
- "strings"
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/utils"
+ "io"
+ "net/http"
+ "strings"
)
-// NotificationService FIXME ...
-type NotificationService struct {
+// DefaultPipelineNotificationService FIXME ...
+type DefaultPipelineNotificationService struct {
EndPoint string
Secret string
}
-// NewNotificationService FIXME ...
-func NewNotificationService(endpoint, secret string) *NotificationService {
- return &NotificationService{
+// NewDefaultPipelineNotificationService creates a new
DefaultPipelineNotificationService
+func NewDefaultPipelineNotificationService(endpoint, secret string)
*DefaultPipelineNotificationService {
+ return &DefaultPipelineNotificationService{
EndPoint: endpoint,
Secret: secret,
}
}
-// PipelineNotification FIXME ...
-type PipelineNotification struct {
- PipelineID uint64
- CreatedAt time.Time
- UpdatedAt time.Time
- BeganAt *time.Time
- FinishedAt *time.Time
- Status string
-}
-
// PipelineStatusChanged FIXME ...
-func (n *NotificationService) PipelineStatusChanged(params
PipelineNotification) errors.Error {
+func (n *DefaultPipelineNotificationService) PipelineStatusChanged(params
PipelineNotificationParam) errors.Error {
return n.sendNotification(models.NotificationPipelineStatusChanged,
params)
}
-func (n *NotificationService) sendNotification(notificationType
models.NotificationType, data interface{}) errors.Error {
+func (n *DefaultPipelineNotificationService) sendNotification(notificationType
models.NotificationType, data interface{}) errors.Error {
var dataJson, err = json.Marshal(data)
if err != nil {
return errors.Convert(err)
@@ -99,7 +87,7 @@ func (n *NotificationService)
sendNotification(notificationType models.Notificat
return db.Update(notification)
}
-func (n *NotificationService) signature(input, nouce string) string {
+func (n *DefaultPipelineNotificationService) signature(input, nouce string)
string {
sum := sha256.Sum256([]byte(input + n.Secret + nouce))
return hex.EncodeToString(sum[:])
}