This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new c1b0b9bb5 fix: add docker init, solve zombie process (#4209)
c1b0b9bb5 is described below

commit c1b0b9bb52719be312d3d6f8935edbcee8294a13
Author: abeizn <[email protected]>
AuthorDate: Fri Jan 13 17:37:05 2023 +0800

    fix: add docker init, solve zombie process (#4209)
---
 backend/Dockerfile                     |  4 ++++
 backend/plugins/dbt/dbt.go             |  2 ++
 backend/plugins/dbt/tasks/convertor.go | 23 ++++++++++++++---------
 3 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/backend/Dockerfile b/backend/Dockerfile
index b29320241..caa6725fa 100644
--- a/backend/Dockerfile
+++ b/backend/Dockerfile
@@ -156,5 +156,9 @@ COPY --from=build /app/resources/tap /app/resources/tap
 
 ENV PATH="/app/bin:${PATH}"
 
+# add tini, prevent zombie process
+RUN apk add --no-cache tini
+ENTRYPOINT ["/sbin/tini", "--"]
+
 CMD ["lake"]
 
diff --git a/backend/plugins/dbt/dbt.go b/backend/plugins/dbt/dbt.go
index 765cc432b..7cc225245 100644
--- a/backend/plugins/dbt/dbt.go
+++ b/backend/plugins/dbt/dbt.go
@@ -38,6 +38,7 @@ func main() {
        failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
        profilesPath := dbtCmd.Flags().StringP("profilesPath", "", 
"/Users/abeizn/.dbt", "dbt profiles path")
        profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt 
profile")
+       threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
        noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false, 
"dbt no version check")
        excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "", 
[]string{}, "dbt exclude models")
        selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
@@ -67,6 +68,7 @@ func main() {
                        "failFast":       *failFast,
                        "profilesPath":   *profilesPath,
                        "profile":        *profile,
+                       "threads":        *threads,
                        "noVersionCheck": *noVersionCheck,
                        "excludeModels":  *excludeModels,
                        "selector":       *selector,
diff --git a/backend/plugins/dbt/tasks/convertor.go 
b/backend/plugins/dbt/tasks/convertor.go
index daa5cce91..2335f0d28 100644
--- a/backend/plugins/dbt/tasks/convertor.go
+++ b/backend/plugins/dbt/tasks/convertor.go
@@ -20,8 +20,6 @@ package tasks
 import (
        "bufio"
        "encoding/json"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/plugin"
        "net"
        "net/url"
        "os"
@@ -30,10 +28,12 @@ import (
        "strconv"
        "strings"
 
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
        "github.com/spf13/viper"
 )
 
-func DbtConverter(taskCtx plugin.SubTaskContext) errors.Error {
+func DbtConverter(taskCtx plugin.SubTaskContext) (err errors.Error) {
        logger := taskCtx.GetLogger()
        taskCtx.SetProgress(0, -1)
        data := taskCtx.GetData().(*DbtTaskData)
@@ -44,6 +44,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) errors.Error 
{
        projectVars := data.Options.ProjectVars
        args := data.Options.Args
        failFast := data.Options.FailFast
+       threads := data.Options.Threads
        noVersionCheck := data.Options.NoVersionCheck
        excludeModels := data.Options.ExcludeModels
        selector := data.Options.Selector
@@ -55,7 +56,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) errors.Error 
{
        profile := data.Options.Profile
 
        defaultProfilesPath := filepath.Join(projectPath, "profiles.yml")
-       _, err := errors.Convert01(os.Stat(defaultProfilesPath))
+       _, err = errors.Convert01(os.Stat(defaultProfilesPath))
        // if profiles.yml not exist, create it manually
        if err != nil {
                dbUrl := taskCtx.GetConfig("DB_URL")
@@ -117,7 +118,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Info("dbt deps run script: ", cmdDeps)
                // prevent zombie process
                defer func() {
-                       if err := errors.Convert(cmdDeps.Wait()); err != nil {
+                       if err = errors.Convert(cmdDeps.Wait()); err != nil {
                                logger.Error(nil, "dbt deps run cmd.cmdDeps() 
error")
                        }
                }()
@@ -127,7 +128,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) 
errors.Error {
 
        }
        //set default threads = 1, prevent dbt threads can not release, so 
occur zombie process
-       dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath, 
"--threads", "1"}
+       dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
        if projectVars != nil {
                jsonProjectVars, err := json.Marshal(projectVars)
                if err != nil {
@@ -146,6 +147,10 @@ func DbtConverter(taskCtx plugin.SubTaskContext) 
errors.Error {
        if failFast {
                dbtExecParams = append(dbtExecParams, "--fail-fast")
        }
+       if threads != 0 {
+               dbtExecParams = append(dbtExecParams, "--threads")
+               dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+       }
        if noVersionCheck {
                dbtExecParams = append(dbtExecParams, "--no-version-check")
        }
@@ -196,7 +201,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) 
errors.Error {
 
        // prevent zombie process
        defer func() {
-               err := errors.Convert(cmd.Wait())
+               err = errors.Convert(cmd.Wait())
                if err != nil {
                        logger.Error(err, "The DBT project run failed!")
                } else {
@@ -216,7 +221,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) 
errors.Error {
                        taskCtx.IncProgress(1)
                }
        }
-       if err := errors.Convert(scanner.Err()); err != nil {
+       if err = errors.Convert(scanner.Err()); err != nil {
                logger.Error(err, "dbt read stdout failed.")
                return err
        }
@@ -227,7 +232,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) 
errors.Error {
                return errors.Convert(closeErr)
        }
 
-       return nil
+       return err
 }
 
 var DbtConverterMeta = plugin.SubTaskMeta{

Reply via email to