This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e630f8d64 fix: dbt zombie process, set dbt default threads=1 (#4132)
e630f8d64 is described below

commit e630f8d645108a2ce76c1d0475d6ede05c35f285
Author: abeizn <[email protected]>
AuthorDate: Fri Jan 6 16:30:10 2023 +0800

    fix: dbt zombie process, set dbt default threads=1 (#4132)
---
 plugins/dbt/dbt.go             |  2 --
 plugins/dbt/tasks/convertor.go | 50 ++++++++++++++++++++++++++++++++++--------
 2 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index 910567b28..0dd882180 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -38,7 +38,6 @@ func main() {
        failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
        profilesPath := dbtCmd.Flags().StringP("profilesPath", "", 
"/Users/abeizn/.dbt", "dbt profiles path")
        profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt 
profile")
-       threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
        noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false, 
"dbt no version check")
        excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "", 
[]string{}, "dbt exclude models")
        selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
@@ -68,7 +67,6 @@ func main() {
                        "failFast":       *failFast,
                        "profilesPath":   *profilesPath,
                        "profile":        *profile,
-                       "threads":        *threads,
                        "noVersionCheck": *noVersionCheck,
                        "excludeModels":  *excludeModels,
                        "selector":       *selector,
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 63a92ecd0..da4169563 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "bufio"
        "encoding/json"
        "net"
        "net/url"
@@ -44,7 +45,6 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
        projectVars := data.Options.ProjectVars
        args := data.Options.Args
        failFast := data.Options.FailFast
-       threads := data.Options.Threads
        noVersionCheck := data.Options.NoVersionCheck
        excludeModels := data.Options.ExcludeModels
        selector := data.Options.Selector
@@ -127,7 +127,8 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error 
{
                }
 
        }
-       dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
+       //set default threads = 1, prevent dbt threads can not release, so 
occur zombie process
+       dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath, 
"--threads", "1"}
        if projectVars != nil {
                jsonProjectVars, err := json.Marshal(projectVars)
                if err != nil {
@@ -146,10 +147,6 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
        if failFast {
                dbtExecParams = append(dbtExecParams, "--fail-fast")
        }
-       if threads != 0 {
-               dbtExecParams = append(dbtExecParams, "--threads")
-               dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
-       }
        if noVersionCheck {
                dbtExecParams = append(dbtExecParams, "--no-version-check")
        }
@@ -189,12 +186,47 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
        cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
        log.Info("dbt run script: ", cmd)
 
-       stdout, stdoutErr := cmd.CombinedOutput()
+       stdout, stdoutErr := cmd.StdoutPipe()
        if stdoutErr != nil {
                return errors.Convert(stdoutErr)
        }
-       log.Info("dbt run log: ", string(stdout))
-       log.Info("End of dbt program execution.")
+
+       if err = errors.Convert(cmd.Start()); err != nil {
+               return err
+       }
+
+       // prevent zombie process
+       defer func() {
+               err := errors.Convert(cmd.Wait())
+               if err != nil {
+                       log.Error(err, "The DBT project run failed!")
+               } else {
+                       log.Info("The DBT project run ended.")
+               }
+       }()
+
+       scanner := bufio.NewScanner(stdout)
+       var errStr string
+       for scanner.Scan() {
+               line := scanner.Text()
+               log.Info(line)
+               if strings.Contains(line, "Encountered an error") || errStr != 
"" {
+                       errStr += line + "\n"
+               }
+               if strings.Contains(line, "of") && strings.Contains(line, "OK") 
{
+                       taskCtx.IncProgress(1)
+               }
+       }
+       if err := errors.Convert(scanner.Err()); err != nil {
+               log.Error(err, "dbt read stdout failed.")
+               return err
+       }
+
+       // close stdout
+       if closeErr := stdout.Close(); closeErr != nil && err == nil {
+               log.Error(closeErr, "dbt close stdout failed.")
+               return errors.Convert(closeErr)
+       }
 
        return nil
 }

Reply via email to