This is an automated email from the ASF dual-hosted git repository.

lynwee pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v1.0 by this push:
     new a2fffb995 cherry pick feat(framework): add custom pipeline 
notification service #7920 to v1.0 (#7924)
a2fffb995 is described below

commit a2fffb99557430afe9155e868294b842ee1aa757
Author: Lynwee <[email protected]>
AuthorDate: Tue Aug 20 17:12:32 2024 +0800

    cherry pick feat(framework): add custom pipeline notification service #7920 
to v1.0 (#7924)
    
    * feat(framework): add custom pipeline notification service
    
    * fix(e2e): fix errors
---
 backend/server/api/api.go                          |  9 ++++
 backend/server/services/init.go                    |  8 ++++
 backend/server/services/pipeline.go                | 48 +++++++++++++++------
 backend/server/services/pipeline_notification.go   | 49 ++++++++++++++++++++++
 ...tification.go => pipeline_notification_impl.go} | 34 +++++----------
 5 files changed, 113 insertions(+), 35 deletions(-)

diff --git a/backend/server/api/api.go b/backend/server/api/api.go
index 7c0266b61..95a485713 100644
--- a/backend/server/api/api.go
+++ b/backend/server/api/api.go
@@ -59,6 +59,15 @@ func Init() {
        basicRes = services.GetBasicRes()
 }
 
+func InjectCustomService(pipelineNotifier 
services.PipelineNotificationService) errors.Error {
+       if pipelineNotifier != nil {
+               if err := services.InjectCustomService(pipelineNotifier); err 
!= nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 // @title  DevLake Swagger API
 // @version 0.1
 // @description  <h2>This is the main page of devlake api</h2>
diff --git a/backend/server/services/init.go b/backend/server/services/init.go
index e75af65bf..df6308fad 100644
--- a/backend/server/services/init.go
+++ b/backend/server/services/init.go
@@ -125,6 +125,14 @@ func Init() {
        registerPluginsMigrationScripts()
 }
 
+func InjectCustomService(pipelineNotifier PipelineNotificationService) 
errors.Error {
+       if pipelineNotifier == nil {
+               return errors.Default.New("pipeline notifier is nil")
+       }
+       customPipelineNotificationService = pipelineNotifier
+       return nil
+}
+
 var statusLock sync.Mutex
 
 // ExecuteMigration executes all pending migration scripts and initialize 
services module
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 15d930205..f0c7f3133 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -41,7 +41,7 @@ import (
        "golang.org/x/sync/semaphore"
 )
 
-var notificationService *NotificationService
+var defaultNotificationService *DefaultPipelineNotificationService
 var globalPipelineLog = logruslog.Global.Nested("pipeline service")
 var pluginOptionSanitizers = map[string]func(map[string]interface{}){
        "gitextractor": func(options map[string]interface{}) {
@@ -85,7 +85,7 @@ func pipelineServiceInit() {
        var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
        var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
        if strings.TrimSpace(notificationEndpoint) != "" {
-               notificationService = 
NewNotificationService(notificationEndpoint, notificationSecret)
+               defaultNotificationService = 
NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
        }
 
        // standalone mode: reset pipeline status
@@ -226,8 +226,10 @@ func GetPipeline(pipelineId uint64, shouldSanitize bool) 
(*models.Pipeline, erro
        if err != nil {
                return nil, err
        }
-       if err := SanitizePipeline(dbPipeline); err != nil {
-               return nil, errors.Convert(err)
+       if shouldSanitize {
+               if err := SanitizePipeline(dbPipeline); err != nil {
+                       return nil, errors.Convert(err)
+               }
        }
        return dbPipeline, nil
 }
@@ -352,9 +354,26 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
        }
 }
 
+func getProjectName(pipeline *models.Pipeline) (string, errors.Error) {
+       if pipeline == nil {
+               return "", errors.Default.New("pipeline is nil")
+       }
+       blueprintId := pipeline.BlueprintId
+       dbBlueprint := &models.Blueprint{}
+       err := db.First(dbBlueprint, dal.Where("id = ?", blueprintId))
+       if err != nil {
+               if db.IsErrorNotFound(err) {
+                       return "", 
errors.NotFound.New(fmt.Sprintf("blueprint(id: %d) not found", blueprintId))
+               }
+               return "", errors.Internal.Wrap(err, "error getting the 
blueprint from database")
+       }
+       return dbBlueprint.ProjectName, nil
+}
+
 // NotifyExternal FIXME ...
 func NotifyExternal(pipelineId uint64) errors.Error {
-       if notificationService == nil {
+       notification := GetPipelineNotificationService()
+       if notification == nil {
                return nil
        }
        // send notification to an external web endpoint
@@ -362,13 +381,18 @@ func NotifyExternal(pipelineId uint64) errors.Error {
        if err != nil {
                return err
        }
-       err = notificationService.PipelineStatusChanged(PipelineNotification{
-               PipelineID: pipeline.ID,
-               CreatedAt:  pipeline.CreatedAt,
-               UpdatedAt:  pipeline.UpdatedAt,
-               BeganAt:    pipeline.BeganAt,
-               FinishedAt: pipeline.FinishedAt,
-               Status:     pipeline.Status,
+       projectName, err := getProjectName(pipeline)
+       if err != nil {
+               return err
+       }
+       err = notification.PipelineStatusChanged(PipelineNotificationParam{
+               ProjectName: projectName,
+               PipelineID:  pipeline.ID,
+               CreatedAt:   pipeline.CreatedAt,
+               UpdatedAt:   pipeline.UpdatedAt,
+               BeganAt:     pipeline.BeganAt,
+               FinishedAt:  pipeline.FinishedAt,
+               Status:      pipeline.Status,
        })
        if err != nil {
                globalPipelineLog.Error(err, "failed to send notification: %v", 
err)
diff --git a/backend/server/services/pipeline_notification.go 
b/backend/server/services/pipeline_notification.go
new file mode 100644
index 000000000..2c25c44ce
--- /dev/null
+++ b/backend/server/services/pipeline_notification.go
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+       "github.com/apache/incubator-devlake/core/errors"
+       "time"
+)
+
+type PipelineNotificationParam struct {
+       ProjectName string
+       PipelineID  uint64
+       CreatedAt   time.Time
+       UpdatedAt   time.Time
+       BeganAt     *time.Time
+       FinishedAt  *time.Time
+       Status      string
+}
+
+type PipelineNotificationService interface {
+       PipelineStatusChanged(params PipelineNotificationParam) errors.Error
+}
+
+var customPipelineNotificationService PipelineNotificationService
+
+func GetPipelineNotificationService() PipelineNotificationService {
+       if customPipelineNotificationService != nil {
+               return customPipelineNotificationService
+       }
+       if defaultNotificationService != nil {
+               return defaultNotificationService
+       }
+       return nil
+}
diff --git a/backend/server/services/notification.go 
b/backend/server/services/pipeline_notification_impl.go
similarity index 76%
rename from backend/server/services/notification.go
rename to backend/server/services/pipeline_notification_impl.go
index 854a78e53..2a4a096a5 100644
--- a/backend/server/services/notification.go
+++ b/backend/server/services/pipeline_notification_impl.go
@@ -22,46 +22,34 @@ import (
        "encoding/hex"
        "encoding/json"
        "fmt"
-       "io"
-       "net/http"
-       "strings"
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/utils"
+       "io"
+       "net/http"
+       "strings"
 )
 
-// NotificationService FIXME ...
-type NotificationService struct {
+// DefaultPipelineNotificationService FIXME ...
+type DefaultPipelineNotificationService struct {
        EndPoint string
        Secret   string
 }
 
-// NewNotificationService FIXME ...
-func NewNotificationService(endpoint, secret string) *NotificationService {
-       return &NotificationService{
+// NewDefaultPipelineNotificationService creates a new 
DefaultPipelineNotificationService
+func NewDefaultPipelineNotificationService(endpoint, secret string) 
*DefaultPipelineNotificationService {
+       return &DefaultPipelineNotificationService{
                EndPoint: endpoint,
                Secret:   secret,
        }
 }
 
-// PipelineNotification FIXME ...
-type PipelineNotification struct {
-       PipelineID uint64
-       CreatedAt  time.Time
-       UpdatedAt  time.Time
-       BeganAt    *time.Time
-       FinishedAt *time.Time
-       Status     string
-}
-
 // PipelineStatusChanged FIXME ...
-func (n *NotificationService) PipelineStatusChanged(params 
PipelineNotification) errors.Error {
+func (n *DefaultPipelineNotificationService) PipelineStatusChanged(params 
PipelineNotificationParam) errors.Error {
        return n.sendNotification(models.NotificationPipelineStatusChanged, 
params)
 }
 
-func (n *NotificationService) sendNotification(notificationType 
models.NotificationType, data interface{}) errors.Error {
+func (n *DefaultPipelineNotificationService) sendNotification(notificationType 
models.NotificationType, data interface{}) errors.Error {
        var dataJson, err = json.Marshal(data)
        if err != nil {
                return errors.Convert(err)
@@ -99,7 +87,7 @@ func (n *NotificationService) 
sendNotification(notificationType models.Notificat
        return db.Update(notification)
 }
 
-func (n *NotificationService) signature(input, nouce string) string {
+func (n *DefaultPipelineNotificationService) signature(input, nouce string) 
string {
        sum := sha256.Sum256([]byte(input + n.Secret + nouce))
        return hex.EncodeToString(sum[:])
 }

Reply via email to