This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 426b3d7f3 fix: fix the canceling does not work when SkipOnFail was set
to true (#4090)
426b3d7f3 is described below
commit 426b3d7f3470a1633ed6d892647d292983c70ea4
Author: mindlesscloud <[email protected]>
AuthorDate: Wed Jan 4 09:58:28 2023 +0800
fix: fix the canceling does not work when SkipOnFail was set to true (#4090)
---
errors/util_test.go | 51 ++++++++++++++++++++++++++++++++++++++
plugins/helper/api_collector.go | 3 ++-
plugins/helper/worker_scheduler.go | 5 ++++
runner/run_pipeline.go | 4 ++-
services/task.go | 5 ++++
5 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/errors/util_test.go b/errors/util_test.go
new file mode 100644
index 000000000..099ddd424
--- /dev/null
+++ b/errors/util_test.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package errors
+
+import (
+ "context"
+ "testing"
+)
+
+func TestIs(t *testing.T) {
+ type args struct {
+ err error
+ target error
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ "",
+ args{
+ Default.Wrap(Convert(context.Canceled), "wrap"),
+ context.Canceled,
+ },
+ true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := Is(tt.args.err, tt.args.target); got !=
tt.want {
+ t.Errorf("Is() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index fedda51fb..52511b3ae 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -168,7 +168,8 @@ func (collector *ApiCollector) Execute() errors.Error {
break
}
}
- input, err := iterator.Fetch()
+ var input interface{}
+ input, err = iterator.Fetch()
if err != nil {
break
}
diff --git a/plugins/helper/worker_scheduler.go
b/plugins/helper/worker_scheduler.go
index 45be63c3f..3d0a9cf90 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -151,6 +151,11 @@ func (s *WorkerScheduler) NextTick(task func()
errors.Error) {
func (s *WorkerScheduler) Wait() errors.Error {
s.waitGroup.Wait()
if len(s.workerErrors) > 0 {
+ for _, err := range s.workerErrors {
+ if errors.Is(err, context.Canceled) {
+ return errors.Default.Wrap(err, "task canceled")
+ }
+ }
return errors.Default.Combine(s.workerErrors)
}
return nil
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 7c0d36cc0..04c964480 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -18,6 +18,7 @@ limitations under the License.
package runner
import (
+ "context"
"time"
"github.com/apache/incubator-devlake/errors"
@@ -84,7 +85,8 @@ func runPipelineTasks(
err = runTasks(row)
if err != nil {
log.Error(err, "run tasks failed")
- if !dbPipeline.SkipOnFail {
+ if errors.Is(err, context.Canceled) ||
!dbPipeline.SkipOnFail {
+ log.Info("return error")
return err
}
}
diff --git a/services/task.go b/services/task.go
index 7b93897e7..20d3af508 100644
--- a/services/task.go
+++ b/services/task.go
@@ -18,6 +18,7 @@ limitations under the License.
package services
import (
+ "context"
"encoding/json"
"fmt"
"regexp"
@@ -207,6 +208,10 @@ func RunTasksStandalone(parentLogger core.Logger, taskIds
[]uint64) errors.Error
for _, e := range errs {
_, _ = sb.WriteString(e.Error())
_, _ = sb.WriteString("\n")
+ if errors.Is(e, context.Canceled) {
+ parentLogger.Info("task canceled")
+ return errors.Convert(e)
+ }
}
err = errors.Default.New(sb.String())
}