This is an automated email from the ASF dual-hosted git repository. lynwee pushed a commit to branch dev-1 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 38ffe617429aec573f872afe2dbcd1cbd71784f9 Author: d4x1 <[email protected]> AuthorDate: Tue Aug 20 22:31:40 2024 +0800 feat(framwork): add pipeline_id in task context --- backend/server/services/pipeline_runner.go | 3 ++- backend/server/services/task.go | 8 ++++---- backend/server/services/task_runner.go | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/backend/server/services/pipeline_runner.go b/backend/server/services/pipeline_runner.go index b16dc2dc3..5b262b01c 100644 --- a/backend/server/services/pipeline_runner.go +++ b/backend/server/services/pipeline_runner.go @@ -35,11 +35,12 @@ type pipelineRunner struct { } func (p *pipelineRunner) runPipelineStandalone() errors.Error { + ctx := context.WithValue(context.Background(), "pipeline_id", p.pipeline.ID) return runner.RunPipeline( basicRes.ReplaceLogger(p.logger), p.pipeline.ID, func(taskIds []uint64) errors.Error { - return RunTasksStandalone(p.logger, taskIds) + return RunTasksStandalone(ctx, p.logger, taskIds) }, ) } diff --git a/backend/server/services/task.go b/backend/server/services/task.go index a416d88be..d15491d40 100644 --- a/backend/server/services/task.go +++ b/backend/server/services/task.go @@ -175,21 +175,21 @@ func CancelTask(taskId uint64) errors.Error { } // RunTasksStandalone run tasks in parallel -func RunTasksStandalone(parentLogger log.Logger, taskIds []uint64) errors.Error { +func RunTasksStandalone(ctx context.Context, parentLogger log.Logger, taskIds []uint64) errors.Error { if len(taskIds) == 0 { return nil } results := make(chan error) for _, taskId := range taskIds { - go func(id uint64) { + go func(ctx context.Context, id uint64) { taskLog.Info("run task #%d in background ", id) var err errors.Error - taskErr := runTaskStandalone(parentLogger, id) + taskErr := runTaskStandalone(ctx, parentLogger, id) if taskErr != nil { err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", id)) } results <- err - }(taskId) + }(ctx, taskId) } errs := make([]error, 0) var err error diff --git a/backend/server/services/task_runner.go b/backend/server/services/task_runner.go index 0ad643bf3..4cd1c0abd 100644 --- a/backend/server/services/task_runner.go +++ b/backend/server/services/task_runner.go @@ -96,13 +96,13 @@ func init() { runningTasks.tasks = make(map[uint64]*RunningTaskData) } -func runTaskStandalone(parentLog log.Logger, taskId uint64) errors.Error { +func runTaskStandalone(ctx context.Context, parentLog log.Logger, taskId uint64) errors.Error { // deferring cleaning up defer func() { _, _ = runningTasks.Remove(taskId) }() // for task cancelling - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) err := runningTasks.Add(taskId, cancel) if err != nil { return err
