This is an automated email from the ASF dual-hosted git repository.

lynwee pushed a commit to branch dev-1
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 38ffe617429aec573f872afe2dbcd1cbd71784f9
Author: d4x1 <[email protected]>
AuthorDate: Tue Aug 20 22:31:40 2024 +0800

    feat(framwork): add pipeline_id in task context
---
 backend/server/services/pipeline_runner.go | 3 ++-
 backend/server/services/task.go            | 8 ++++----
 backend/server/services/task_runner.go     | 4 ++--
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/backend/server/services/pipeline_runner.go 
b/backend/server/services/pipeline_runner.go
index b16dc2dc3..5b262b01c 100644
--- a/backend/server/services/pipeline_runner.go
+++ b/backend/server/services/pipeline_runner.go
@@ -35,11 +35,12 @@ type pipelineRunner struct {
 }
 
 func (p *pipelineRunner) runPipelineStandalone() errors.Error {
+       ctx := context.WithValue(context.Background(), "pipeline_id", 
p.pipeline.ID)
        return runner.RunPipeline(
                basicRes.ReplaceLogger(p.logger),
                p.pipeline.ID,
                func(taskIds []uint64) errors.Error {
-                       return RunTasksStandalone(p.logger, taskIds)
+                       return RunTasksStandalone(ctx, p.logger, taskIds)
                },
        )
 }
diff --git a/backend/server/services/task.go b/backend/server/services/task.go
index a416d88be..d15491d40 100644
--- a/backend/server/services/task.go
+++ b/backend/server/services/task.go
@@ -175,21 +175,21 @@ func CancelTask(taskId uint64) errors.Error {
 }
 
 // RunTasksStandalone run tasks in parallel
-func RunTasksStandalone(parentLogger log.Logger, taskIds []uint64) 
errors.Error {
+func RunTasksStandalone(ctx context.Context, parentLogger log.Logger, taskIds 
[]uint64) errors.Error {
        if len(taskIds) == 0 {
                return nil
        }
        results := make(chan error)
        for _, taskId := range taskIds {
-               go func(id uint64) {
+               go func(ctx context.Context, id uint64) {
                        taskLog.Info("run task #%d in background ", id)
                        var err errors.Error
-                       taskErr := runTaskStandalone(parentLogger, id)
+                       taskErr := runTaskStandalone(ctx, parentLogger, id)
                        if taskErr != nil {
                                err = errors.Default.Wrap(taskErr, 
fmt.Sprintf("Error running task %d.", id))
                        }
                        results <- err
-               }(taskId)
+               }(ctx, taskId)
        }
        errs := make([]error, 0)
        var err error
diff --git a/backend/server/services/task_runner.go 
b/backend/server/services/task_runner.go
index 0ad643bf3..4cd1c0abd 100644
--- a/backend/server/services/task_runner.go
+++ b/backend/server/services/task_runner.go
@@ -96,13 +96,13 @@ func init() {
        runningTasks.tasks = make(map[uint64]*RunningTaskData)
 }
 
-func runTaskStandalone(parentLog log.Logger, taskId uint64) errors.Error {
+func runTaskStandalone(ctx context.Context, parentLog log.Logger, taskId 
uint64) errors.Error {
        // deferring cleaning up
        defer func() {
                _, _ = runningTasks.Remove(taskId)
        }()
        // for task cancelling
-       ctx, cancel := context.WithCancel(context.Background())
+       ctx, cancel := context.WithCancel(ctx)
        err := runningTasks.Add(taskId, cancel)
        if err != nil {
                return err

Reply via email to