This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4302140d [fix-2940][framework]: Add lazy caching of http codes for 
error handling and fix any broken Http error messages (#2984)
4302140d is described below

commit 4302140db231daa5e9671fad2985179c9fa58423
Author: Keon Amini <[email protected]>
AuthorDate: Thu Sep 15 05:14:07 2022 -0500

    [fix-2940][framework]: Add lazy caching of http codes for error handling 
and fix any broken Http error messages (#2984)
    
    * fix: add lazy caching of http codes for error handling
    
    * fix: uplift github test action go version to get build running (also see 
#2908)
    
    * fix: github workflow adjustments
    
    * refactor: reworked parts of the error interface to handle combining 
errors better + addressed the issue in #3031
    
    * fix: fixed some error messages
    
    * fix: restored error-400 default error behavior in plugin router
    
    * fix: removed redundant function call
    
    * fix: fixed some post-merge buggy behavior
    
    * refactor: removed pre-merge user-message based implementation
    
    * refactor: minor naming changes
---
 api/router.go                          |   2 +-
 api/shared/api_output.go               |  11 ++--
 errors/errors.go                       |   4 +-
 errors/errors_test.go                  |  17 +++---
 errors/impl.go                         |  69 +++++++++++++---------
 errors/{errors.go => map.go}           |  46 +++++++--------
 errors/message.go                      | 104 +++++++++++++++++++++++++++++++++
 errors/types.go                        |  42 +++++--------
 plugins/gitextractor/gitextractor.go   |   2 +-
 plugins/github/api/connection.go       |  18 +++---
 plugins/helper/graphql_async_client.go |   2 +-
 plugins/helper/worker_scheduler.go     |   2 +-
 runner/run_task.go                     |   4 +-
 services/pipeline_runner.go            |  10 +++-
 services/task.go                       |  14 +++--
 15 files changed, 231 insertions(+), 116 deletions(-)

diff --git a/api/router.go b/api/router.go
index 1a86f767..97fd3d96 100644
--- a/api/router.go
+++ b/api/router.go
@@ -103,7 +103,7 @@ func handlePluginCall(pluginName string, handler 
core.ApiResourceHandler) func(c
                }
                output, err := handler(input)
                if err != nil {
-                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
fmt.Sprintf("error executing the requested resource for plugin %s", 
pluginName)))
+                       shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
fmt.Sprintf("error executing the requested resource for plugin %s", 
pluginName)))
                } else if output != nil {
                        status := output.Status
                        if status < http.StatusContinue {
diff --git a/api/shared/api_output.go b/api/shared/api_output.go
index 69be3ef1..cd0ffc6d 100644
--- a/api/shared/api_output.go
+++ b/api/shared/api_output.go
@@ -29,8 +29,9 @@ import (
 const BadRequestBody = "bad request body format"
 
 type ApiBody struct {
-       Success bool   `json:"success"`
-       Message string `json:"message"`
+       Success bool     `json:"success"`
+       Message string   `json:"message"`
+       Causes  []string `json:"causes"`
 }
 
 type ResponsePipelines struct {
@@ -42,9 +43,11 @@ type ResponsePipelines struct {
 func ApiOutputError(c *gin.Context, err error) {
        if e, ok := err.(errors.Error); ok {
                logger.Global.Error(err, "HTTP %d error", 
e.GetType().GetHttpCode())
+               messages := e.Messages()
                c.JSON(e.GetType().GetHttpCode(), &ApiBody{
                        Success: false,
-                       Message: e.Message(),
+                       Message: messages.Get(),
+                       Causes:  messages.Causes(),
                })
        } else {
                logger.Global.Error(err, "HTTP %d error (native)", 
http.StatusInternalServerError)
@@ -71,7 +74,7 @@ func ApiOutputSuccess(c *gin.Context, body interface{}, 
status int) {
 func ApiOutputAbort(c *gin.Context, err error) {
        if e, ok := err.(errors.Error); ok {
                logger.Global.Error(err, "HTTP %d abort-error", 
e.GetType().GetHttpCode())
-               _ = c.AbortWithError(e.GetType().GetHttpCode(), 
fmt.Errorf(e.Message()))
+               _ = c.AbortWithError(e.GetType().GetHttpCode(), 
fmt.Errorf(e.Messages().Format()))
        } else {
                logger.Global.Error(err, "HTTP %d abort-error (native)", 
http.StatusInternalServerError)
                _ = c.AbortWithError(http.StatusInternalServerError, err)
diff --git a/errors/errors.go b/errors/errors.go
index 5df4909f..ff839670 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -25,8 +25,8 @@ type (
        // Error The interface that all internally managed errors should adhere 
to.
        Error interface {
                requiredSupertype
-               // Message the message associated with this Error.
-               Message() string
+               // Messages the message associated with this Error.
+               Messages() Messages
                // GetType gets the Type of this error
                GetType() *Type
                // As Attempts to cast this Error to the requested Type, and 
returns nil if it can't.
diff --git a/errors/errors_test.go b/errors/errors_test.go
index d241f93e..0d59c95e 100644
--- a/errors/errors_test.go
+++ b/errors/errors_test.go
@@ -35,7 +35,7 @@ func TestCrdbErrorImpl(t *testing.T) {
                require.Equal(t, err.Error(), lakeErr.Error())
        })
        t.Run("raw_message", func(t *testing.T) {
-               msg := lakeErr.Message()
+               msg := lakeErr.Messages().Format()
                require.NotEqual(t, err.Error(), msg)
                fmt.Printf("======================Raw 
Message=======================: \n%s\n\n\n", msg)
                msgParts := strings.Split(msg, "\ncaused by: ")
@@ -59,7 +59,7 @@ func TestCrdbErrorImpl(t *testing.T) {
                require.True(t, errors.Is(lakeErr, os.ErrNotExist))
        })
        t.Run("combine_errors_type", func(t *testing.T) {
-               err = Unauthorized.Combine([]error{err, err}, "combined")
+               err = Unauthorized.Combine([]error{err, err})
                lakeErr = AsLakeErrorType(err)
                require.NotNil(t, lakeErr)
                e := lakeErr.As(Unauthorized)
@@ -78,31 +78,32 @@ func TestCrdbErrorImpl(t *testing.T) {
                baseErr := BadInput.Wrap(rawErr, "wrapped")
                err2 := Convert(baseErr)
                require.Same(t, baseErr, err2)
-               require.Equal(t, "wrapped (400)", err2.Message())
+               require.Equal(t, "wrapped (400)", err2.Messages().Get())
                require.Same(t, rawErr, err2.Unwrap())
                err3 := Default.WrapRaw(baseErr)
                require.NotSame(t, baseErr, err3)
-               require.Equal(t, "wrapped (400)", err3.Message())
+               require.Equal(t, "wrapped (400)", err3.Messages().Get())
+               require.Equal(t, "wrapped (400)", err3.Messages().Get())
                require.Same(t, baseErr, err3.Unwrap())
        })
 }
 
-func f1() error {
+func f1() Error {
        err := f2()
        return Default.Wrap(err, "f1 error")
 }
 
-func f2() error {
+func f2() Error {
        err := f3()
        return NotFound.Wrap(err, "f2 error")
 }
 
-func f3() error {
+func f3() Error {
        err := f4()
        return Default.Wrap(err, "f3 error")
 }
 
-func f4() error {
+func f4() Error {
        err := f5()
        return BadInput.WrapRaw(err)
 }
diff --git a/errors/impl.go b/errors/impl.go
index 2c55f9ce..83d51d2f 100644
--- a/errors/impl.go
+++ b/errors/impl.go
@@ -30,7 +30,7 @@ type (
        crdbErrorImpl struct {
                wrappedRaw error
                wrapped    *crdbErrorImpl
-               msg        string
+               msg        *errMessage
                data       interface{}
                t          *Type
        }
@@ -56,17 +56,10 @@ func (e *crdbErrorImpl) Error() string {
        return parts[1]
 }
 
-func (e *crdbErrorImpl) Message() string {
-       return strings.Join(e.getMessages(func(err *crdbErrorImpl) string {
-               if err.msg == "" {
-                       return ""
-               }
-               code := ""
-               if err.t.httpCode != 0 {
-                       code = fmt.Sprintf("(%d)", err.t.httpCode)
-               }
-               return err.msg + " " + code
-       }), "\ncaused by: ")
+func (e *crdbErrorImpl) Messages() Messages {
+       return e.getMessages(func(err *crdbErrorImpl) *errMessage {
+               return err.msg
+       })
 }
 
 func (e *crdbErrorImpl) Unwrap() error {
@@ -98,13 +91,13 @@ func (e *crdbErrorImpl) As(t *Type) Error {
        }
 }
 
-func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) string) 
[]string {
-       msgs := []string{}
+func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) 
*errMessage) []*errMessage {
+       msgs := []*errMessage{}
        err := e
        ok := false
        for {
                msg := getMessage(err)
-               if msg != "" {
+               if len(msg.msgs) > 0 {
                        msgs = append(msgs, msg)
                }
                unwrapped := err.Unwrap()
@@ -120,41 +113,63 @@ func (e *crdbErrorImpl) getMessages(getMessage 
func(*crdbErrorImpl) string) []st
        return msgs
 }
 
-func newCrdbError(t *Type, err error, message string, opts ...Option) 
*crdbErrorImpl {
+func newSingleCrdbError(t *Type, err error, message string, opts ...Option) 
Error {
        cfg := &options{}
        for _, opt := range opts {
                opt(cfg)
        }
+       cfg.stackOffset += 1
+       msg := &errMessage{}
+       if cast, ok := err.(*crdbErrorImpl); ok {
+               if t == Default { // inherit wrapped error's type
+                       t = cast.GetType()
+               }
+       }
+       msg.addMessage(t, message)
+       return newCrdbError(t, err, msg, cfg)
+}
+
+func newCombinedCrdbError(t *Type, errs []error) Error {
+       msg := &errMessage{}
+       for _, e := range errs {
+               if le, ok := e.(*crdbErrorImpl); ok {
+                       msg.appendMessage(le.msg.getMessage())
+               } else {
+                       msg.appendMessage(e.Error())
+               }
+       }
+       opts := &options{}
+       opts.stackOffset += 1
+       return newCrdbError(t, nil, msg, opts)
+}
+
+func newCrdbError(t *Type, err error, msg *errMessage, opts *options) 
*crdbErrorImpl {
        errType := t
        var wrappedErr *crdbErrorImpl
        var wrappedRaw error
-       rawMessage := message
-       cfg.stackOffset += 2
+       opts.stackOffset += 2
        if err == nil {
                if enableStacktraces {
-                       wrappedRaw = cerror.NewWithDepth(int(cfg.stackOffset), 
rawMessage)
+                       wrappedRaw = cerror.NewWithDepth(int(opts.stackOffset), 
msg.getPrettifiedMessage())
                } else {
-                       wrappedRaw = errors.New(message)
+                       wrappedRaw = errors.New(msg.getPrettifiedMessage())
                }
        } else {
                if cast, ok := err.(*crdbErrorImpl); ok {
                        err = cast.wrappedRaw
                        wrappedErr = cast
-                       if t == Default { // inherit wrapped error's type
-                               errType = cast.GetType()
-                       }
                }
                if enableStacktraces {
-                       wrappedRaw = cerror.WrapWithDepth(int(cfg.stackOffset), 
err, rawMessage)
+                       wrappedRaw = 
cerror.WrapWithDepth(int(opts.stackOffset), err, msg.getPrettifiedMessage())
                } else {
-                       wrappedRaw = cerror.WithDetail(err, rawMessage)
+                       wrappedRaw = cerror.WithDetail(err, 
msg.getPrettifiedMessage())
                }
        }
        impl := &crdbErrorImpl{
                wrappedRaw: wrappedRaw,
                wrapped:    wrappedErr,
-               msg:        rawMessage,
-               data:       cfg.data,
+               msg:        msg,
+               data:       opts.data,
                t:          errType,
        }
        return impl
diff --git a/errors/errors.go b/errors/map.go
similarity index 51%
copy from errors/errors.go
copy to errors/map.go
index 5df4909f..0628ab96 100644
--- a/errors/errors.go
+++ b/errors/map.go
@@ -17,32 +17,28 @@ limitations under the License.
 
 package errors
 
-type (
-       requiredSupertype interface {
-               error
-               Unwrap() error
-       }
-       // Error The interface that all internally managed errors should adhere 
to.
-       Error interface {
-               requiredSupertype
-               // Message the message associated with this Error.
-               Message() string
-               // GetType gets the Type of this error
-               GetType() *Type
-               // As Attempts to cast this Error to the requested Type, and 
returns nil if it can't.
-               As(*Type) Error
-               // GetData returns the data associated with this Error (may be 
nil)
-               GetData() interface{}
-       }
-)
+import "sync"
+
+// Wraps the native sync map in generic form. Consider moving this to another 
package for broader use later
+type syncMap[K any, V any] struct {
+       m *sync.Map
+}
 
-// AsLakeErrorType attempts to cast err to Error, otherwise returns nil
-func AsLakeErrorType(err error) Error {
-       if cast, ok := err.(Error); ok {
-               return cast
+func newSyncMap[K any, V any]() *syncMap[K, V] {
+       return &syncMap[K, V]{
+               m: new(sync.Map),
        }
-       return nil
 }
 
-var _ error = (Error)(nil)
-var _ requiredSupertype = (Error)(nil)
+func (sm *syncMap[K, V]) Store(key K, val V) {
+       sm.m.Store(key, val)
+}
+
+func (sm *syncMap[K, V]) Load(key K) (V, bool) {
+       var v V
+       val, ok := sm.m.Load(key)
+       if ok {
+               v = val.(V)
+       }
+       return v, ok
+}
diff --git a/errors/message.go b/errors/message.go
new file mode 100644
index 00000000..1a645b85
--- /dev/null
+++ b/errors/message.go
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package errors
+
+import (
+       "fmt"
+       "strings"
+)
+
+type (
+       // Messages alias for messages of an Error
+       Messages []*errMessage
+       // MessageType the type of message for an Error
+       MessageType int
+
+       // errMessage an abstraction around a given Error's message
+       errMessage struct {
+               // all the messages associated with an Error. The size will be 
> 1 if the Error is created using Type.Combine
+               msgs []string
+       }
+)
+
+func (m *errMessage) addMessage(t *Type, msg string) {
+       if msg == "" {
+               return
+       }
+       if t.httpCode != 0 {
+               msg = fmt.Sprintf("%s (%d)", msg, t.httpCode)
+       }
+       m.appendMessage(msg)
+}
+
+func (m *errMessage) appendMessage(msg string) {
+       m.msgs = append(m.msgs, msg)
+}
+
+func (m *errMessage) getMessage() string {
+       if len(m.msgs) == 0 {
+               return ""
+       }
+       return strings.Join(m.msgs, ",")
+}
+
+func (m *errMessage) getPrettifiedMessage() string {
+       if len(m.msgs) == 0 {
+               return ""
+       }
+       if len(m.msgs) == 1 {
+               return m.msgs[0]
+       }
+       effectiveMsg := strings.Join(m.msgs, "\n=====================\n")
+       effectiveMsg = "\t" + strings.ReplaceAll(effectiveMsg, "\n", "\n\t")
+       return fmt.Sprintf("\ncombined messages: \n{\n%s\n}", effectiveMsg)
+}
+
+// Format formats the messages into a single string
+func (m Messages) Format() string {
+       msgs := []string{}
+       for _, m := range m {
+               if msg := m.getMessage(); msg != "" {
+                       msgs = append(msgs, msg)
+               }
+       }
+       return strings.Join(msgs, "\ncaused by: ")
+}
+
+// Get gets the main (top-level) (or first non-empty message if exists) 
message of the Messages
+func (m Messages) Get() string {
+       for _, m := range m {
+               if msg := m.getMessage(); msg != "" {
+                       return msg
+               }
+       }
+       return ""
+}
+
+// Causes gets the non-main messages of the Messages in causal sequence
+func (m Messages) Causes() []string {
+       if len(m) < 2 {
+               return nil
+       }
+       causes := []string{}
+       for _, m := range m[1:] {
+               if msg := m.getMessage(); msg != "" {
+                       causes = append(causes, msg)
+               }
+       }
+       return causes
+}
diff --git a/errors/types.go b/errors/types.go
index 981ff64a..06bd82dd 100644
--- a/errors/types.go
+++ b/errors/types.go
@@ -20,7 +20,6 @@ package errors
 import (
        "fmt"
        "net/http"
-       "strings"
 )
 
 // Supported error types
@@ -37,7 +36,7 @@ var (
        Timeout      = register(&Type{httpCode: http.StatusGatewayTimeout, 
meta: "timeout"})
 
        //cached values
-       typesByHttpCode = map[int]*Type{}
+       typesByHttpCode = newSyncMap[int, *Type]()
 )
 
 type (
@@ -58,24 +57,22 @@ type (
 )
 
 func HttpStatus(code int) *Type {
-       t, ok := typesByHttpCode[code]
-       if !ok {
-               t = Internal
+       t, ok := typesByHttpCode.Load(code)
+       if !ok { // lazily cache any missing codes
+               t = &Type{httpCode: code, meta: fmt.Sprintf("type_http_%d", 
code)}
+               typesByHttpCode.Store(code, t)
        }
        return t
 }
 
 // New constructs a new Error instance with this message
 func (t *Type) New(message string, opts ...Option) Error {
-       return newCrdbError(t, nil, message, opts...)
+       return newSingleCrdbError(t, nil, message, opts...)
 }
 
 // Wrap constructs a new Error instance with this message and wraps the passed 
in error. A nil 'err' will return a nil Error.
 func (t *Type) Wrap(err error, message string, opts ...Option) Error {
-       if err == nil {
-               return nil
-       }
-       return newCrdbError(t, err, message, opts...)
+       return newSingleCrdbError(t, err, message, opts...)
 }
 
 // WrapRaw constructs a new Error instance that directly wraps this error with 
no additional context. A nil 'err' will return a nil Error.
@@ -103,24 +100,13 @@ func (t *Type) wrapRaw(err error, forceWrap bool, opts 
...Option) Error {
        } else {
                msg = err.Error()
        }
-       return newCrdbError(t, err, msg, opts...)
+       return newSingleCrdbError(t, err, msg, opts...)
 }
 
-// Combine constructs a new Error from combining multiple errors. Stacktrace 
info for each of the errors will not be present in the result.
-func (t *Type) Combine(errs []error, msg string, opts ...Option) Error {
-       msgs := []string{}
-       for _, e := range errs {
-               if le := AsLakeErrorType(e); le != nil {
-                       if msg0 := le.Message(); msg0 != "" {
-                               msgs = append(msgs, le.Message())
-                       }
-               } else {
-                       msgs = append(msgs, e.Error())
-               }
-       }
-       effectiveMsg := strings.Join(msgs, "\n=====================\n")
-       effectiveMsg = "\t" + strings.ReplaceAll(effectiveMsg, "\n", "\n\t")
-       return newCrdbError(t, nil, fmt.Sprintf("%s\ncombined messages: 
\n{\n%s\n}", msg, effectiveMsg), opts...)
+// Combine constructs a new Error from combining multiple errors. Stacktrace 
info for each of the errors will not be present in the result, so it's
+// best to log the errors before combining them.
+func (t *Type) Combine(errs []error) Error {
+       return newCombinedCrdbError(t, errs)
 }
 
 // GetHttpCode gets the associated Http code with this Type, if explicitly 
set, otherwise http.StatusInternalServerError
@@ -149,9 +135,9 @@ func withStackOffset(offset uint) Option {
 func register(t *Type) *Type {
        if t == nil {
                t = &Type{meta: "default"}
-               typesByHttpCode[t.httpCode] = t
+               typesByHttpCode.Store(t.httpCode, t)
        } else if t.httpCode != 0 {
-               typesByHttpCode[t.httpCode] = t
+               typesByHttpCode.Store(t.httpCode, t)
        }
        return t
 }
diff --git a/plugins/gitextractor/gitextractor.go 
b/plugins/gitextractor/gitextractor.go
index 0e9793f4..a0ac527a 100644
--- a/plugins/gitextractor/gitextractor.go
+++ b/plugins/gitextractor/gitextractor.go
@@ -54,7 +54,7 @@ func (plugin GitExtractor) SubTaskMetas() []core.SubTaskMeta {
 // based on task context and user input options, return data that shared among 
all subtasks
 func (plugin GitExtractor) PrepareTaskData(taskCtx core.TaskContext, options 
map[string]interface{}) (interface{}, errors.Error) {
        var op tasks.GitExtractorOptions
-       if err := helper.Decode(options, op, nil); err != nil {
+       if err := helper.Decode(options, &op, nil); err != nil {
                return nil, err
        }
        if err := op.Valid(); err != nil {
diff --git a/plugins/github/api/connection.go b/plugins/github/api/connection.go
index 046a4bfb..973530a1 100644
--- a/plugins/github/api/connection.go
+++ b/plugins/github/api/connection.go
@@ -57,7 +57,7 @@ func TestConnection(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, erro
 
        // verify multiple token in parallel
        type VerifyResult struct {
-               err   error
+               err   errors.Error
                login string
        }
        results := make(chan VerifyResult)
@@ -76,21 +76,21 @@ func TestConnection(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, erro
                                basicRes,
                        )
                        if err != nil {
-                               results <- VerifyResult{err: 
errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, 
token))}
+                               results <- VerifyResult{err: 
errors.BadInput.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, 
token))}
                                return
                        }
                        res, err := apiClient.Get("user", nil, nil)
                        if err != nil {
-                               results <- VerifyResult{err: 
errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, 
token))}
+                               results <- VerifyResult{err: 
errors.HttpStatus(res.StatusCode).Wrap(err, fmt.Sprintf("verify token failed 
for #%d %s", j, token))}
                                return
                        }
                        githubUserOfToken := &models.GithubUserOfToken{}
                        err = helper.UnmarshalResponse(res, githubUserOfToken)
                        if err != nil {
-                               results <- VerifyResult{err: 
errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%v %s", j, 
token))}
+                               results <- VerifyResult{err: 
errors.BadInput.Wrap(err, fmt.Sprintf("verify token failed for #%v %s", j, 
token))}
                                return
                        } else if githubUserOfToken.Login == "" {
-                               results <- VerifyResult{err: 
errors.Default.Wrap(err, fmt.Sprintf("invalid token for #%v %s", j, token))}
+                               results <- VerifyResult{err: 
errors.BadInput.Wrap(err, fmt.Sprintf("invalid token for #%v %s", j, token))}
                                return
                        }
                        results <- VerifyResult{login: githubUserOfToken.Login}
@@ -99,11 +99,11 @@ func TestConnection(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, erro
 
        // collect verification results
        logins := make([]string, 0)
-       msgs := make([]string, 0)
+       allErrors := make([]error, 0)
        i := 0
        for result := range results {
                if result.err != nil {
-                       msgs = append(msgs, result.err.Error())
+                       allErrors = append(allErrors, result.err)
                }
                logins = append(logins, result.login)
                i++
@@ -111,8 +111,8 @@ func TestConnection(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, erro
                        close(results)
                }
        }
-       if len(msgs) > 0 {
-               return nil, errors.Default.New(strings.Join(msgs, "\n"))
+       if len(allErrors) > 0 {
+               return nil, errors.Default.Combine(allErrors)
        }
 
        githubApiResponse := GithubTestConnResponse{}
diff --git a/plugins/helper/graphql_async_client.go 
b/plugins/helper/graphql_async_client.go
index 43068b80..91be1728 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -147,7 +147,7 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() 
errors.Error) {
 func (apiClient *GraphqlAsyncClient) Wait() errors.Error {
        apiClient.waitGroup.Wait()
        if len(apiClient.workerErrors) > 0 {
-               return errors.Default.Combine(apiClient.workerErrors, "graphql 
workers encountered error(s)")
+               return errors.Default.Combine(apiClient.workerErrors)
        }
        return nil
 }
diff --git a/plugins/helper/worker_scheduler.go 
b/plugins/helper/worker_scheduler.go
index 8a181bfb..e74f228f 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -152,7 +152,7 @@ func (s *WorkerScheduler) NextTick(task func() 
errors.Error) {
 func (s *WorkerScheduler) Wait() errors.Error {
        s.waitGroup.Wait()
        if len(s.workerErrors) > 0 {
-               return errors.Default.Combine(s.workerErrors, "worker scheduler 
captured these errors")
+               return errors.Default.Combine(s.workerErrors)
        }
        return nil
 }
diff --git a/runner/run_task.go b/runner/run_task.go
index 116e6169..28124877 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -70,10 +70,12 @@ func RunTask(
                                if meta, ok := 
lakeErr.GetData().(*core.SubTaskMeta); ok {
                                        subTaskName = meta.Name
                                }
+                       } else {
+                               lakeErr = errors.Convert(err)
                        }
                        dbe := db.Model(task).Updates(map[string]interface{}{
                                "status":          models.TASK_FAILED,
-                               "message":         err.Error(),
+                               "message":         lakeErr.Messages().Format(),
                                "finished_at":     finishedAt,
                                "spent_seconds":   spentSeconds,
                                "failed_sub_task": subTaskName,
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index f40746e0..5ab5c4c6 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -113,11 +113,11 @@ func runPipeline(pipelineId uint64) errors.Error {
                err = pipelineRun.runPipelineStandalone()
        }
        if err != nil {
-               err = errors.Default.Wrap(err, fmt.Sprintf("error running 
pipeline %d", pipelineId))
+               err = errors.Default.Wrap(err, fmt.Sprintf("Error running 
pipeline %d.", pipelineId))
        }
        pipeline, e := GetPipeline(pipelineId)
        if e != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("unable to get 
pipeline %d", pipelineId))
+               return errors.Default.Wrap(err, fmt.Sprintf("Unable to get 
pipeline %d.", pipelineId))
        }
        // finished, update database
        finishedAt := time.Now()
@@ -125,7 +125,11 @@ func runPipeline(pipelineId uint64) errors.Error {
        pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
        if err != nil {
                pipeline.Status = models.TASK_FAILED
-               pipeline.Message = err.Error()
+               if lakeErr := errors.AsLakeErrorType(err); lakeErr != nil {
+                       pipeline.Message = lakeErr.Messages().Format()
+               } else {
+                       pipeline.Message = err.Error()
+               }
        } else {
                pipeline.Status = models.TASK_COMPLETED
                pipeline.Message = ""
diff --git a/services/task.go b/services/task.go
index fd5f7a6a..2268956d 100644
--- a/services/task.go
+++ b/services/task.go
@@ -24,7 +24,6 @@ import (
        "github.com/apache/incubator-devlake/errors"
        "regexp"
        "strconv"
-       "strings"
        "sync"
 
        "github.com/apache/incubator-devlake/logger"
@@ -225,16 +224,21 @@ func runTasksStandalone(parentLogger core.Logger, taskIds 
[]uint64) errors.Error
                taskId := taskId
                go func() {
                        taskLog.Info("run task in background ", taskId)
-                       results <- runTaskStandalone(parentLogger, taskId)
+                       var err errors.Error
+                       taskErr := runTaskStandalone(parentLogger, taskId)
+                       if taskErr != nil {
+                               err = errors.Default.Wrap(taskErr, 
fmt.Sprintf("Error running task %d.", taskId))
+                       }
+                       results <- err
                }()
        }
-       errs := make([]string, 0)
+       errs := make([]error, 0)
        var err error
        finished := 0
        for err = range results {
                if err != nil {
                        taskLog.Error(err, "task failed")
-                       errs = append(errs, err.Error())
+                       errs = append(errs, err)
                }
                finished++
                if finished == len(taskIds) {
@@ -242,7 +246,7 @@ func runTasksStandalone(parentLogger core.Logger, taskIds 
[]uint64) errors.Error
                }
        }
        if len(errs) > 0 {
-               err = errors.Default.New(strings.Join(errs, "\n"))
+               err = errors.Default.Combine(errs)
        }
        return errors.Convert(err)
 }

Reply via email to