This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 426b3d7f3 fix: fix the canceling does not work when SkipOnFail was set 
to true (#4090)
426b3d7f3 is described below

commit 426b3d7f3470a1633ed6d892647d292983c70ea4
Author: mindlesscloud <[email protected]>
AuthorDate: Wed Jan 4 09:58:28 2023 +0800

    fix: fix the canceling does not work when SkipOnFail was set to true (#4090)
---
 errors/util_test.go                | 51 ++++++++++++++++++++++++++++++++++++++
 plugins/helper/api_collector.go    |  3 ++-
 plugins/helper/worker_scheduler.go |  5 ++++
 runner/run_pipeline.go             |  4 ++-
 services/task.go                   |  5 ++++
 5 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/errors/util_test.go b/errors/util_test.go
new file mode 100644
index 000000000..099ddd424
--- /dev/null
+++ b/errors/util_test.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package errors
+
+import (
+       "context"
+       "testing"
+)
+
+func TestIs(t *testing.T) {
+       type args struct {
+               err    error
+               target error
+       }
+       tests := []struct {
+               name string
+               args args
+               want bool
+       }{
+               {
+                       "",
+                       args{
+                               Default.Wrap(Convert(context.Canceled), "wrap"),
+                               context.Canceled,
+                       },
+                       true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := Is(tt.args.err, tt.args.target); got != 
tt.want {
+                               t.Errorf("Is() = %v, want %v", got, tt.want)
+                       }
+               })
+       }
+}
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index fedda51fb..52511b3ae 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -168,7 +168,8 @@ func (collector *ApiCollector) Execute() errors.Error {
                                        break
                                }
                        }
-                       input, err := iterator.Fetch()
+                       var input interface{}
+                       input, err = iterator.Fetch()
                        if err != nil {
                                break
                        }
diff --git a/plugins/helper/worker_scheduler.go 
b/plugins/helper/worker_scheduler.go
index 45be63c3f..3d0a9cf90 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -151,6 +151,11 @@ func (s *WorkerScheduler) NextTick(task func() 
errors.Error) {
 func (s *WorkerScheduler) Wait() errors.Error {
        s.waitGroup.Wait()
        if len(s.workerErrors) > 0 {
+               for _, err := range s.workerErrors {
+                       if errors.Is(err, context.Canceled) {
+                               return errors.Default.Wrap(err, "task canceled")
+                       }
+               }
                return errors.Default.Combine(s.workerErrors)
        }
        return nil
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 7c0d36cc0..04c964480 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -18,6 +18,7 @@ limitations under the License.
 package runner
 
 import (
+       "context"
        "time"
 
        "github.com/apache/incubator-devlake/errors"
@@ -84,7 +85,8 @@ func runPipelineTasks(
                err = runTasks(row)
                if err != nil {
                        log.Error(err, "run tasks failed")
-                       if !dbPipeline.SkipOnFail {
+                       if errors.Is(err, context.Canceled) || 
!dbPipeline.SkipOnFail {
+                               log.Info("return error")
                                return err
                        }
                }
diff --git a/services/task.go b/services/task.go
index 7b93897e7..20d3af508 100644
--- a/services/task.go
+++ b/services/task.go
@@ -18,6 +18,7 @@ limitations under the License.
 package services
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "regexp"
@@ -207,6 +208,10 @@ func RunTasksStandalone(parentLogger core.Logger, taskIds 
[]uint64) errors.Error
                for _, e := range errs {
                        _, _ = sb.WriteString(e.Error())
                        _, _ = sb.WriteString("\n")
+                       if errors.Is(e, context.Canceled) {
+                               parentLogger.Info("task canceled")
+                               return errors.Convert(e)
+                       }
                }
                err = errors.Default.New(sb.String())
        }

Reply via email to