This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new 5e581c7bb fix: maybe can fix-2 dbt zombie processes (#4116)
5e581c7bb is described below

commit 5e581c7bbff3bc7ea15d365142de4cd36bf652ee
Author: abeizn <[email protected]>
AuthorDate: Thu Jan 5 18:07:18 2023 +0800

    fix: maybe can fix-2 dbt zombie processes (#4116)
---
 plugins/dbt/tasks/convertor.go | 36 +++++++++++++++++++++++-------------
 1 file changed, 23 insertions(+), 13 deletions(-)

diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 7115e660f..542d21c0f 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -115,11 +115,18 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
        defaultPackagesPath := filepath.Join(projectPath, "packages.yml")
        _, err = errors.Convert01(os.Stat(defaultPackagesPath))
        if err == nil {
-               cmd := exec.Command("dbt", "deps")
-               err = errors.Convert(cmd.Start())
-               if err != nil {
+               cmdDeps := exec.Command("dbt", "deps")
+               log.Info("dbt deps run script: ", cmdDeps)
+               // prevent zombie process
+               defer func() {
+                       if err := errors.Convert(cmdDeps.Wait()); err != nil {
+                               log.Error(nil, "dbt deps run cmd.cmdDeps() 
error")
+                       }
+               }()
+               if err = errors.Convert(cmdDeps.Start()); err != nil {
                        return err
                }
+
        }
        dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
        if projectVars != nil {
@@ -182,23 +189,24 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
        }
        cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
        log.Info("dbt run script: ", cmd)
-       stdout, _ := cmd.StdoutPipe()
+
+       stdout, stdoutErr := cmd.StdoutPipe()
+       if stdoutErr != nil {
+               return errors.Convert(stdoutErr)
+       }
+
        if err = errors.Convert(cmd.Start()); err != nil {
                return err
        }
-       // ProcessState contains information about an exited process, available 
after a call to Wait.
-       defer func() {
-               if !cmd.ProcessState.Success() {
-                       log.Error(nil, "dbt run task error, please check!!!")
-               }
-       }()
+       defer stdout.Close()
 
        // prevent zombie process
        defer func() {
-               err := errors.Convert(cmd.Wait())
-               if err != nil {
-                       log.Error(nil, "dbt run cmd.Wait() error")
+               ProcessState, err := cmd.Process.Wait()
+               if err != nil || !ProcessState.Success() {
+                       log.Error(err, "dbt run cmd.Wait() error")
                }
+               log.Info("End of dbt program execution.")
        }()
 
        scanner := bufio.NewScanner(stdout)
@@ -214,11 +222,13 @@ func DbtConverter(taskCtx core.SubTaskContext) 
errors.Error {
                }
        }
        if err := errors.Convert(scanner.Err()); err != nil {
+               log.Error(err, "dbt read stdout failed.")
                return err
        }
 
        // close stdout
        if closeErr := stdout.Close(); closeErr != nil && err == nil {
+               log.Error(closeErr, "dbt close stdout failed.")
                return errors.Convert(closeErr)
        }
 

Reply via email to