This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new c1b0b9bb5 fix: add docker init, solve zombie process (#4209)
c1b0b9bb5 is described below
commit c1b0b9bb52719be312d3d6f8935edbcee8294a13
Author: abeizn <[email protected]>
AuthorDate: Fri Jan 13 17:37:05 2023 +0800
fix: add docker init, solve zombie process (#4209)
---
backend/Dockerfile | 4 ++++
backend/plugins/dbt/dbt.go | 2 ++
backend/plugins/dbt/tasks/convertor.go | 23 ++++++++++++++---------
3 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/backend/Dockerfile b/backend/Dockerfile
index b29320241..caa6725fa 100644
--- a/backend/Dockerfile
+++ b/backend/Dockerfile
@@ -156,5 +156,9 @@ COPY --from=build /app/resources/tap /app/resources/tap
ENV PATH="/app/bin:${PATH}"
+# add tini, prevent zombie process
+RUN apk add --no-cache tini
+ENTRYPOINT ["/sbin/tini", "--"]
+
CMD ["lake"]
diff --git a/backend/plugins/dbt/dbt.go b/backend/plugins/dbt/dbt.go
index 765cc432b..7cc225245 100644
--- a/backend/plugins/dbt/dbt.go
+++ b/backend/plugins/dbt/dbt.go
@@ -38,6 +38,7 @@ func main() {
failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
profilesPath := dbtCmd.Flags().StringP("profilesPath", "",
"/Users/abeizn/.dbt", "dbt profiles path")
profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt
profile")
+ threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false,
"dbt no version check")
excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "",
[]string{}, "dbt exclude models")
selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
@@ -67,6 +68,7 @@ func main() {
"failFast": *failFast,
"profilesPath": *profilesPath,
"profile": *profile,
+ "threads": *threads,
"noVersionCheck": *noVersionCheck,
"excludeModels": *excludeModels,
"selector": *selector,
diff --git a/backend/plugins/dbt/tasks/convertor.go
b/backend/plugins/dbt/tasks/convertor.go
index daa5cce91..2335f0d28 100644
--- a/backend/plugins/dbt/tasks/convertor.go
+++ b/backend/plugins/dbt/tasks/convertor.go
@@ -20,8 +20,6 @@ package tasks
import (
"bufio"
"encoding/json"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/plugin"
"net"
"net/url"
"os"
@@ -30,10 +28,12 @@ import (
"strconv"
"strings"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
"github.com/spf13/viper"
)
-func DbtConverter(taskCtx plugin.SubTaskContext) errors.Error {
+func DbtConverter(taskCtx plugin.SubTaskContext) (err errors.Error) {
logger := taskCtx.GetLogger()
taskCtx.SetProgress(0, -1)
data := taskCtx.GetData().(*DbtTaskData)
@@ -44,6 +44,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) errors.Error
{
projectVars := data.Options.ProjectVars
args := data.Options.Args
failFast := data.Options.FailFast
+ threads := data.Options.Threads
noVersionCheck := data.Options.NoVersionCheck
excludeModels := data.Options.ExcludeModels
selector := data.Options.Selector
@@ -55,7 +56,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext) errors.Error
{
profile := data.Options.Profile
defaultProfilesPath := filepath.Join(projectPath, "profiles.yml")
- _, err := errors.Convert01(os.Stat(defaultProfilesPath))
+ _, err = errors.Convert01(os.Stat(defaultProfilesPath))
// if profiles.yml not exist, create it manually
if err != nil {
dbUrl := taskCtx.GetConfig("DB_URL")
@@ -117,7 +118,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Info("dbt deps run script: ", cmdDeps)
// prevent zombie process
defer func() {
- if err := errors.Convert(cmdDeps.Wait()); err != nil {
+ if err = errors.Convert(cmdDeps.Wait()); err != nil {
logger.Error(nil, "dbt deps run cmd.cmdDeps()
error")
}
}()
@@ -127,7 +128,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext)
errors.Error {
}
//set default threads = 1, prevent dbt threads can not release, so
occur zombie process
- dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath,
"--threads", "1"}
+ dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
if projectVars != nil {
jsonProjectVars, err := json.Marshal(projectVars)
if err != nil {
@@ -146,6 +147,10 @@ func DbtConverter(taskCtx plugin.SubTaskContext)
errors.Error {
if failFast {
dbtExecParams = append(dbtExecParams, "--fail-fast")
}
+ if threads != 0 {
+ dbtExecParams = append(dbtExecParams, "--threads")
+ dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+ }
if noVersionCheck {
dbtExecParams = append(dbtExecParams, "--no-version-check")
}
@@ -196,7 +201,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext)
errors.Error {
// prevent zombie process
defer func() {
- err := errors.Convert(cmd.Wait())
+ err = errors.Convert(cmd.Wait())
if err != nil {
logger.Error(err, "The DBT project run failed!")
} else {
@@ -216,7 +221,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext)
errors.Error {
taskCtx.IncProgress(1)
}
}
- if err := errors.Convert(scanner.Err()); err != nil {
+ if err = errors.Convert(scanner.Err()); err != nil {
logger.Error(err, "dbt read stdout failed.")
return err
}
@@ -227,7 +232,7 @@ func DbtConverter(taskCtx plugin.SubTaskContext)
errors.Error {
return errors.Convert(closeErr)
}
- return nil
+ return err
}
var DbtConverterMeta = plugin.SubTaskMeta{