This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.15
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.15 by this push:
     new 1bec691fc fix: dbt error not return (#4237) (#4366)
1bec691fc is described below

commit 1bec691fcf0927c8b3c1a2fbf9dbb167d48de6d4
Author: long2ice <[email protected]>
AuthorDate: Thu Feb 9 14:27:54 2023 +0800

    fix: dbt error not return (#4237) (#4366)
---
 plugins/dbt/tasks/convertor.go | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index da4169563..8c3d7ff50 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -45,6 +45,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
        projectVars := data.Options.ProjectVars
        args := data.Options.Args
        failFast := data.Options.FailFast
+       threads := data.Options.Threads
        noVersionCheck := data.Options.NoVersionCheck
        excludeModels := data.Options.ExcludeModels
        selector := data.Options.Selector
@@ -118,7 +119,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error 
{
                log.Info("dbt deps run script: ", cmdDeps)
                // prevent zombie process
                defer func() {
-                       if err := errors.Convert(cmdDeps.Wait()); err != nil {
+                       if err = errors.Convert(cmdDeps.Wait()); err != nil {
                                log.Error(nil, "dbt deps run cmd.cmdDeps() 
error")
                        }
                }()
@@ -147,6 +148,10 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
        if failFast {
                dbtExecParams = append(dbtExecParams, "--fail-fast")
        }
+       if threads != 0 {
+               dbtExecParams = append(dbtExecParams, "--threads")
+               dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+       }
        if noVersionCheck {
                dbtExecParams = append(dbtExecParams, "--no-version-check")
        }
@@ -196,17 +201,18 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
        }
 
        // prevent zombie process
+       var errStr string
        defer func() {
-               err := errors.Convert(cmd.Wait())
+               err = errors.Convert(cmd.Wait())
                if err != nil {
                        log.Error(err, "The DBT project run failed!")
+                       err = errors.SubtaskErr.New(errStr)
                } else {
                        log.Info("The DBT project run ended.")
                }
        }()
 
        scanner := bufio.NewScanner(stdout)
-       var errStr string
        for scanner.Scan() {
                line := scanner.Text()
                log.Info(line)
@@ -217,7 +223,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error 
{
                        taskCtx.IncProgress(1)
                }
        }
-       if err := errors.Convert(scanner.Err()); err != nil {
+       if err = errors.Convert(scanner.Err()); err != nil {
                log.Error(err, "dbt read stdout failed.")
                return err
        }
@@ -228,7 +234,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error 
{
                return errors.Convert(closeErr)
        }
 
-       return nil
+       return err
 }
 
 var DbtConverterMeta = core.SubTaskMeta{

Reply via email to