This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e38b46152 feat: pipeline/scope-deletion mutual exclusive (#5609)
e38b46152 is described below

commit e38b4615258398fe3c3f0c4df4400591d08326d1
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Jul 4 14:21:11 2023 +0800

    feat: pipeline/scope-deletion mutual exclusive (#5609)
    
    * feat: pipeline/scope-deletion mutual exclusive
    
    * fix: ci e2e-test failed
    
    * fix: too many plugins compile in parallel causing sigkill
    
    * fix: add os dependency
    
    * fix: allow emptying scope even if it is referenced
    
    * fix: debuging line and closing channel
    
    * fix: prevent writing to a closed channel
---
 .gitignore                                         |   1 +
 backend/Makefile                                   |   8 +-
 backend/core/dal/dal.go                            |   4 +
 backend/core/errors/util.go                        |  13 +++
 backend/core/runner/db.go                          |   2 +-
 backend/helpers/dbhelper/txhelper.go               |  96 +++++++++++++++++
 .../pluginhelper/api/scope_generic_helper.go       |  34 ++++--
 backend/helpers/pluginhelper/api/scope_helper.go   |   7 +-
 backend/impls/dalgorm/dalgorm.go                   |   2 +-
 backend/impls/dalgorm/dalgorm_transaction.go       |  52 +++++++++
 backend/python/DevelopmentSetup.md                 |  11 ++
 backend/scripts/compile-plugins.sh                 |   8 ++
 backend/server/api/shared/api_output.go            |  10 +-
 backend/server/services/init.go                    |  22 +---
 backend/server/services/locking.go                 |  54 ++++------
 backend/server/services/pipeline.go                | 116 ++++++++++++---------
 backend/server/services/pipeline_helper.go         |  79 +++++---------
 17 files changed, 343 insertions(+), 176 deletions(-)

diff --git a/.gitignore b/.gitignore
index 3cdcad23a..a9d2bbe8a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -159,3 +159,4 @@ backend/server/api/docs/docs.go
 # python
 *.pyc
 __pycache__
+venv
\ No newline at end of file
diff --git a/backend/Makefile b/backend/Makefile
index e768d79fb..38a18751a 100644
--- a/backend/Makefile
+++ b/backend/Makefile
@@ -102,7 +102,6 @@ python-unit-test: build-pydevlake
        sh ./python/run_tests.sh
 
 e2e-plugins-test:
-       exit_code=0;\
        export ENV_PATH=$(shell readlink -f .env);\
        set -e;\
        for m in $$(go list ./plugins/... | egrep 'e2e'); do \
@@ -118,11 +117,12 @@ e2e-test-init:
        go run ./test/init.go || exit $$?;\
 
 e2e-test-run:
-       exit_code=0;\
        for m in $$(go list ./test/e2e/... | grep -v manual); do \
-               echo $$m; go test -p 1 -timeout 300s -v $$m || exit_code=$$?; \
+               echo $$m; \
+               if ! go test -p 1 -timeout 300s -v $$m ; then \
+                 exit $$?; \
+               fi; \
        done; \
-       exit $$exit_code
 
 e2e-test: e2e-test-init e2e-test-run
 
diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index 089c428b9..c4bddd1d4 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -187,6 +187,10 @@ type Transaction interface {
        Dal
        Rollback() errors.Error
        Commit() errors.Error
+       // table: exclusive
+       LockTables(tables map[string]bool) errors.Error
+       UnlockTables() errors.Error
+       // End(err *errors.Error)
 }
 
 type Rows interface {
diff --git a/backend/core/errors/util.go b/backend/core/errors/util.go
index 5c6b0559a..01cf382b0 100644
--- a/backend/core/errors/util.go
+++ b/backend/core/errors/util.go
@@ -28,3 +28,16 @@ func Is(err, target error) bool {
 func As(err error, target any) bool {
        return errors.As(err, &target)
 }
+
+func Must(err error) {
+       if err != nil {
+               panic(err)
+       }
+}
+
+func Must1[T any](t T, err error) T {
+       if err != nil {
+               panic(err)
+       }
+       return t
+}
diff --git a/backend/core/runner/db.go b/backend/core/runner/db.go
index bec811059..1e1baba17 100644
--- a/backend/core/runner/db.go
+++ b/backend/core/runner/db.go
@@ -72,7 +72,7 @@ func NewGormDbEx(configReader config.ConfigReader, logger 
log.Logger, sessionCon
                                Colorful:                  true,           // 
Disable color
                        },
                ),
-               PrepareStmt:            sessionConfig.PrepareStmt,
+               // PrepareStmt:            sessionConfig.PrepareStmt,
                SkipDefaultTransaction: sessionConfig.SkipDefaultTransaction,
        }
        dbUrl := configReader.GetString("DB_URL")
diff --git a/backend/helpers/dbhelper/txhelper.go 
b/backend/helpers/dbhelper/txhelper.go
new file mode 100644
index 000000000..307399abd
--- /dev/null
+++ b/backend/helpers/dbhelper/txhelper.go
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dbhelper
+
+import (
+       "fmt"
+       "reflect"
+       "time"
+
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+)
+
+// TxHelper is a helper for transaction management
+type TxHelper[E error] struct {
+       basicRes context.BasicRes
+       perr     *E
+       tx       dal.Transaction
+}
+
+// Begin starts a transaction
+func (l *TxHelper[E]) Begin() dal.Transaction {
+       if l.tx != nil {
+               panic(fmt.Errorf("Begin has been called"))
+       }
+       l.tx = l.basicRes.GetDal().Begin()
+       return l.tx
+}
+
+// LockTablesTimeout locks tables with timeout
+func (l *TxHelper[E]) LockTablesTimeout(timeout time.Duration, tables 
map[string]bool) errors.Error {
+       println("timeout", timeout)
+       c := make(chan errors.Error, 1)
+       go func() {
+               c <- l.tx.LockTables(tables)
+               close(c)
+       }()
+
+       select {
+       case err := <-c:
+               if err != nil {
+                       panic(err)
+               }
+       case <-time.After(timeout):
+               return errors.Timeout.New("lock tables timeout: " + 
fmt.Sprintf("%v", tables))
+       }
+       return nil
+}
+
+// End ends a transaction and commits it if no error was set or it will try to 
rollback and release locked tables
+func (l *TxHelper[E]) End() {
+       if l.tx == nil {
+               panic("Begin was never called")
+       }
+       var msg string
+       err := *l.perr
+       if !reflect.ValueOf(err).IsValid() {
+               r := recover()
+               msg = fmt.Sprintf("%v", r)
+       } else {
+               msg = err.Error()
+       }
+       if msg == "" {
+               errors.Must(l.tx.Commit())
+       }
+       if msg != "" {
+               // l.basicRes.GetLogger().Error(fmt.Errorf(msg), "TxHelper")
+               _ = l.tx.UnlockTables()
+               _ = l.tx.Rollback()
+       }
+       l.tx = nil
+}
+
+// NewTxHelper creates a new TxHelper, the errorPointer is used to detect if 
any error was set
+func NewTxHelper[E error](basicRes context.BasicRes, errorPointer *E) 
*TxHelper[E] {
+       if errorPointer == nil {
+               panic(fmt.Errorf("errorPointer is required"))
+       }
+       return &TxHelper[E]{basicRes: basicRes, perr: errorPointer}
+}
diff --git a/backend/helpers/pluginhelper/api/scope_generic_helper.go 
b/backend/helpers/pluginhelper/api/scope_generic_helper.go
index d6ab4642b..985732f54 100644
--- a/backend/helpers/pluginhelper/api/scope_generic_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_generic_helper.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/incubator-devlake/core/models/common"
        "github.com/apache/incubator-devlake/core/models/domainlayer/domaininfo"
        "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/dbhelper"
        serviceHelper 
"github.com/apache/incubator-devlake/helpers/pluginhelper/services"
        "github.com/go-playground/validator/v10"
        "github.com/mitchellh/mapstructure"
@@ -42,6 +43,7 @@ type NoScopeConfig struct{}
 
 type (
        GenericScopeApiHelper[Conn any, Scope plugin.ToolLayerScope, 
ScopeConfig any] struct {
+               basicRes         context.BasicRes
                log              log.Logger
                db               dal.Dal
                validator        *validator.Validate
@@ -119,6 +121,7 @@ func NewGenericScopeHelper[Conn any, Scope 
plugin.ToolLayerScope, ScopeConfig an
                opts = &ScopeHelperOptions{}
        }
        return &GenericScopeApiHelper[Conn, Scope, ScopeConfig]{
+               basicRes:         basicRes,
                log:              basicRes.GetLogger(),
                db:               basicRes.GetDal(),
                validator:        vld,
@@ -297,7 +300,24 @@ func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig]) 
GetScope(input *plugi
        return scopeRes, nil
 }
 
-func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig]) DeleteScope(input 
*plugin.ApiResourceInput) (*serviceHelper.BlueprintProjectPairs, errors.Error) {
+func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig]) DeleteScope(input 
*plugin.ApiResourceInput) (refs *serviceHelper.BlueprintProjectPairs, err 
errors.Error) {
+       txHelper := dbhelper.NewTxHelper(gs.basicRes, &err)
+       defer txHelper.End()
+       tx := txHelper.Begin()
+       err = txHelper.LockTablesTimeout(2*time.Second, 
map[string]bool{"_devlake_pipelines": false})
+       if err != nil {
+               err = errors.BadInput.Wrap(err, "failed to lock pipeline table, 
is there any running pipeline or deletion?")
+               return
+       }
+       count := errors.Must1(tx.Count(
+               dal.From("_devlake_pipelines"),
+               dal.Where("status = ?", models.TASK_RUNNING),
+       ))
+       if count > 0 {
+               err = errors.BadInput.New("Forbid deleting data while there are 
pipelines running")
+               return
+       }
+       // time.Sleep(1 * time.Minute) # uncomment this line if you were to 
verify pipelines get blocked while deleting data
        params, err := gs.extractFromDeleteReqParam(input)
        if err != nil {
                return nil, err
@@ -310,12 +330,14 @@ func (gs *GenericScopeApiHelper[Conn, Scope, 
ScopeConfig]) DeleteScope(input *pl
        if err != nil {
                return nil, err
        }
-       // now we can as scope to state its `Params` for data bloodline 
identification
-       if refs, err := gs.getScopeReferences(params.connectionId, 
params.scopeId); err != nil || refs != nil {
-               if err != nil {
-                       return nil, err
+
+       if !params.deleteDataOnly {
+               if refs, err := gs.getScopeReferences(params.connectionId, 
params.scopeId); err != nil || refs != nil {
+                       if err != nil {
+                               return nil, err
+                       }
+                       return refs, errors.Conflict.New("Found one or more 
references to this scope")
                }
-               return refs, errors.Conflict.New("Found one or more references 
to this scope")
        }
        if err = gs.deleteScopeData(*scope); err != nil {
                return nil, err
diff --git a/backend/helpers/pluginhelper/api/scope_helper.go 
b/backend/helpers/pluginhelper/api/scope_helper.go
index e2bb6eb87..6474c47fe 100644
--- a/backend/helpers/pluginhelper/api/scope_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_helper.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/server/api/shared"
        "github.com/go-playground/validator/v10"
 )
 
@@ -97,7 +98,11 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScope(input 
*plugin.ApiResourceInpu
 func (c *ScopeApiHelper[Conn, Scope, Tr]) Delete(input 
*plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
        refs, err := c.DeleteScope(input)
        if err != nil {
-               return &plugin.ApiResourceOutput{Body: refs, Status: 
err.GetType().GetHttpCode()}, nil
+               return &plugin.ApiResourceOutput{Body: &shared.ApiBody{
+                       Success: false,
+                       Message: err.Error(),
+                       Data:    refs,
+               }, Status: err.GetType().GetHttpCode()}, nil
        }
        return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
 }
diff --git a/backend/impls/dalgorm/dalgorm.go b/backend/impls/dalgorm/dalgorm.go
index b356b4bf0..5acde4faa 100644
--- a/backend/impls/dalgorm/dalgorm.go
+++ b/backend/impls/dalgorm/dalgorm.go
@@ -436,5 +436,5 @@ func (d *Dalgorm) convertGormError(err error) errors.Error {
        if d.IsDuplicationError(err) {
                return errors.BadInput.WrapRaw(err)
        }
-       return errors.Default.WrapRaw(err)
+       panic(err)
 }
diff --git a/backend/impls/dalgorm/dalgorm_transaction.go 
b/backend/impls/dalgorm/dalgorm_transaction.go
index a6bb95b65..6426a1053 100644
--- a/backend/impls/dalgorm/dalgorm_transaction.go
+++ b/backend/impls/dalgorm/dalgorm_transaction.go
@@ -18,6 +18,8 @@ limitations under the License.
 package dalgorm
 
 import (
+       "fmt"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
 )
@@ -48,6 +50,56 @@ func (t *DalgormTransaction) Commit() errors.Error {
        return nil
 }
 
+func (t *DalgormTransaction) LockTables(tables map[string]bool) errors.Error {
+       switch t.Dialect() {
+       case "mysql":
+               // mysql lock all tables at once, each lock would release all 
previous locks
+               clause := ""
+               for table, exclusive := range tables {
+                       if clause != "" {
+                               clause += ", "
+                       }
+                       clause += table
+                       if exclusive {
+                               clause += " WRITE"
+                       } else {
+                               clause += " READ"
+                       }
+               }
+               return t.Exec(fmt.Sprintf("LOCK TABLES %s", clause))
+       case "postgres":
+               clause := ""
+               for table, exclusive := range tables {
+                       if clause != "" {
+                               clause += ", "
+                       }
+                       clause += table
+                       if exclusive {
+                               clause += "  IN EXCLUSIVE MODE"
+                       } else {
+                               clause += "  IN SHARE MODE"
+                       }
+               }
+               return t.Exec(fmt.Sprintf("LOCK TABLE %s", clause))
+       default:
+               panic(fmt.Errorf("unknown dialect %s", t.Dialect()))
+       }
+}
+
+func (t *DalgormTransaction) UnlockTables() errors.Error {
+       switch t.Dialect() {
+       case "mysql":
+               // mysql would not release lock automatically on Rollback
+               // according to 
https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html
+               return t.Exec("UNLOCK TABLES")
+       case "postgres":
+               // pg has no unlock tables
+               return nil
+       default:
+               panic(fmt.Errorf("unknown dialect %s", t.Dialect()))
+       }
+}
+
 func newTransaction(dalgorm *Dalgorm) *DalgormTransaction {
        return &DalgormTransaction{
                Dalgorm: NewDalgorm(dalgorm.db.Begin()),
diff --git a/backend/python/DevelopmentSetup.md 
b/backend/python/DevelopmentSetup.md
new file mode 100644
index 000000000..86a91887e
--- /dev/null
+++ b/backend/python/DevelopmentSetup.md
@@ -0,0 +1,11 @@
+
+# For `make e2e-test` to run properly, the following steps must be taken:
+
+1. The following packages are required for Ubuntu: `libffi-dev 
default-libmysqlclient-dev libpq-dev`
+2. `python3.9` is required by the time of this document. 
+   - Try `deadsnakes` if you are using Ubuntu 22.04 or above, the 
`python3.9-dev` is required.
+   - Use `virtualenv` if you are having multiple python versions. `virtualenv 
-p python3.9 path/to/venv` and `source path/to/venv/bin/activate.sh` should do 
the trick
+3. [poetry](https://python-poetry.org/) is required. 
+   - run `cd backend/python/pydevlake && poetry install`
+   - run `cd backend/python/plugins/azuredevops && poetry install`
+4. `sqlalchemy` won't work with `localhost` in the database connection string, 
use `127.0.0.1` instead
diff --git a/backend/scripts/compile-plugins.sh 
b/backend/scripts/compile-plugins.sh
index ad82cf6a8..7fad2bdf8 100644
--- a/backend/scripts/compile-plugins.sh
+++ b/backend/scripts/compile-plugins.sh
@@ -55,6 +55,14 @@ for PLUG in $PLUGINS; do
     echo "Building plugin $NAME to bin/plugins/$NAME/$NAME.so"
     go build -buildmode=plugin "$@" -o $PLUGIN_OUTPUT_DIR/$NAME/$NAME.so 
$PLUG/*.go &
     PIDS="$PIDS $!"
+    # avoid too many processes causing signal killed
+    COUNT=$(echo "$PIDS" | wc -w)
+    if [ "$COUNT" -ge "$(nproc)" ]; then
+        for PID in $PIDS; do
+            wait $PID
+        done
+        PIDS=""
+    fi
 done
 
 for PID in $PIDS; do
diff --git a/backend/server/api/shared/api_output.go 
b/backend/server/api/shared/api_output.go
index 5c5c776bd..086aaab9a 100644
--- a/backend/server/api/shared/api_output.go
+++ b/backend/server/api/shared/api_output.go
@@ -19,10 +19,11 @@ package shared
 
 import (
        "fmt"
+       "net/http"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/impls/logruslog"
-       "net/http"
 
        "github.com/gin-gonic/gin"
 )
@@ -30,9 +31,10 @@ import (
 const BadRequestBody = "bad request body format"
 
 type ApiBody struct {
-       Success bool     `json:"success"`
-       Message string   `json:"message"`
-       Causes  []string `json:"causes"`
+       Success bool        `json:"success"`
+       Message string      `json:"message"`
+       Causes  []string    `json:"causes"`
+       Data    interface{} `json:"data"`
 }
 
 type ResponsePipelines struct {
diff --git a/backend/server/services/init.go b/backend/server/services/init.go
index f3f4e4e72..2e27a027e 100644
--- a/backend/server/services/init.go
+++ b/backend/server/services/init.go
@@ -18,7 +18,6 @@ limitations under the License.
 package services
 
 import (
-       "sync"
        "time"
 
        "github.com/apache/incubator-devlake/core/config"
@@ -30,8 +29,6 @@ import (
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/core/runner"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/services"
-       "github.com/apache/incubator-devlake/impls/dalgorm"
-       "github.com/apache/incubator-devlake/impls/logruslog"
        "github.com/go-playground/validator/v10"
        "github.com/robfig/cron/v3"
 )
@@ -44,7 +41,6 @@ var bpManager *services.BlueprintManager
 var basicRes context.BasicRes
 var migrator plugin.Migrator
 var cronManager *cron.Cron
-var cronLocker sync.Mutex
 var vld *validator.Validate
 
 const failToCreateCronJob = "created cron job failed"
@@ -84,7 +80,7 @@ func Init() {
        InitResources()
 
        // lock the database to avoid multiple devlake instances from sharing 
the same one
-       lockDb()
+       lockDatabase()
 
        var err error
        // now, load the plugins
@@ -136,19 +132,3 @@ func ExecuteMigration() errors.Error {
 func MigrationRequireConfirmation() bool {
        return migrator.HasPendingScripts()
 }
-
-func lockDb() {
-       // gorm doesn't support creating a PrepareStmt=false session from a 
PrepareStmt=true
-       // but the lockDatabase needs PrepareStmt=false for table locking, we 
have to deal with it here
-       lockingDb, err := runner.NewGormDbEx(cfg, 
logruslog.Global.Nested("migrator db"), &dal.SessionConfig{
-               PrepareStmt:            false,
-               SkipDefaultTransaction: true,
-       })
-       if err != nil {
-               panic(err)
-       }
-       err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
-       if err != nil {
-               panic(err)
-       }
-}
diff --git a/backend/server/services/locking.go 
b/backend/server/services/locking.go
index a84bcf077..3c6fe8b7a 100644
--- a/backend/server/services/locking.go
+++ b/backend/server/services/locking.go
@@ -18,6 +18,7 @@ limitations under the License.
 package services
 
 import (
+       "fmt"
        "os"
        "time"
 
@@ -32,52 +33,33 @@ var lockingTx dal.Transaction
 
 // lockDatabase prevents multiple devlake instances from sharing the same 
lockDatabase
 // check the models.LockingHistory for the detail
-func lockDatabase(db dal.Dal) errors.Error {
+func lockDatabase() {
+       db := basicRes.GetDal()
        // first, register the instance
-       err := db.AutoMigrate(&models.LockingHistory{})
-       if err != nil {
-               return err
-       }
-       hostName, e := os.Hostname()
-       if e != nil {
-               return errors.Convert(e)
-       }
+       errors.Must(db.AutoMigrate(&models.LockingHistory{}))
+       hostName := errors.Must1(os.Hostname())
        lockingHistory := &models.LockingHistory{
                HostName: hostName,
                Version:  version.Version,
        }
-       err = db.Create(lockingHistory)
-       if err != nil {
-               return err
-       }
-       // 2. obtain the lock
-       err = db.AutoMigrate(&models.LockingStub{})
-       if err != nil {
-               return err
-       }
-       lockingTx = db.Begin()
-       c := make(chan error, 1)
-
+       errors.Must(db.Create(lockingHistory))
+       // 2. obtain the lock: using a never released transaction
        // This prevent multiple devlake instances from sharing the same 
database by locking the migration history table
        // However, it would not work if any older devlake instances were 
already using the database.
+       lockingTx = db.Begin()
+       c := make(chan bool, 1)
        go func() {
-               switch db.Dialect() {
-               case "mysql":
-                       c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub 
WRITE")
-               case "postgres":
-                       c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub 
IN EXCLUSIVE MODE")
-               }
+               errors.Must(lockingTx.AutoMigrate(&models.LockingStub{}))
+               
errors.Must(lockingTx.LockTables(map[string]bool{"_devlake_locking_stub": 
true}))
+               lockingHistory.Succeeded = true
+               errors.Must(db.Update(lockingHistory))
+               c <- true
        }()
 
+       // 3. update the record
        select {
-       case err := <-c:
-               if err != nil {
-                       return errors.Convert(err)
-               }
-       case <-time.After(2 * time.Second):
-               return errors.Default.New("locking _devlake_locking_stub 
timeout, the database might be locked by another devlake instance")
+       case <-c:
+       case <-time.After(3 * time.Second):
+               panic(fmt.Errorf("locking _devlake_locking_stub timeout, the 
database might be locked by another devlake instance"))
        }
-       // 3. update the record
-       lockingHistory.Succeeded = true
-       return db.Update(lockingHistory)
 }
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 73c3dbd0b..b451cf4dc 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/core/utils"
+       "github.com/apache/incubator-devlake/helpers/dbhelper"
        "github.com/apache/incubator-devlake/impls/logruslog"
        "github.com/google/uuid"
        v11 "go.temporal.io/api/enums/v1"
@@ -166,67 +167,81 @@ func GetPipelineLogsArchivePath(pipeline 
*models.Pipeline) (string, errors.Error
        return archive, err
 }
 
+func dequeuePipeline(runningParallelLabels []string) (pipeline 
*models.Pipeline, err errors.Error) {
+       pipeline = &models.Pipeline{}
+       txHelper := dbhelper.NewTxHelper(basicRes, &err)
+       defer txHelper.End()
+       tx := txHelper.Begin()
+       // mysql read lock, not sure if it works for postgresql
+       globalPipelineLog.Debug("acquire lock")
+       errors.Must(tx.LockTables(map[string]bool{
+               "_devlake_pipelines":       false,
+               "_devlake_pipeline_labels": false,
+       }))
+       // prepare query to find an appropriate pipeline to execute
+       err = tx.First(pipeline,
+               dal.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN}),
+               dal.Join(
+                       `left join _devlake_pipeline_labels ON
+                               _devlake_pipeline_labels.pipeline_id = 
_devlake_pipelines.id AND
+                               _devlake_pipeline_labels.name LIKE 'parallel/%' 
AND
+                               _devlake_pipeline_labels.name in ?`,
+                       runningParallelLabels,
+               ),
+               dal.Groupby("id"),
+               dal.Having("count(_devlake_pipeline_labels.name)=0"),
+               dal.Select("id"),
+               dal.Orderby("id ASC"),
+               dal.Limit(1),
+       )
+       if err == nil {
+               // mark the pipeline running, now we want a write lock
+               errors.Must(tx.LockTables(map[string]bool{
+                       "_devlake_pipelines": true,
+               }))
+               err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
+                       {ColumnName: "status", Value: models.TASK_RUNNING},
+                       {ColumnName: "message", Value: ""},
+                       {ColumnName: "began_at", Value: time.Now()},
+               }, dal.Where("id = ?", pipeline.ID))
+               if err != nil {
+                       panic(err)
+               }
+               return
+       }
+       if !tx.IsErrorNotFound(err) {
+               // log unexpected err
+               globalPipelineLog.Error(err, "dequeue failed")
+       }
+       return
+}
+
 // RunPipelineInQueue query pipeline from db and run it in a queue
 func RunPipelineInQueue(pipelineMaxParallel int64) {
        sema := semaphore.NewWeighted(pipelineMaxParallel)
        runningParallelLabels := []string{}
        var runningParallelLabelLock sync.Mutex
+       var err error
        for {
-               globalPipelineLog.Info("acquire lock")
                // start goroutine when sema lock ready and pipeline exist.
                // to avoid read old pipeline, acquire lock before read exist 
pipeline
-               err := sema.Acquire(context.TODO(), 1)
-               if err != nil {
-                       panic(err)
-               }
+               errors.Must(sema.Acquire(context.TODO(), 1))
                globalPipelineLog.Info("get lock and wait next pipeline")
-               dbPipeline := &models.Pipeline{}
+               var dbPipeline *models.Pipeline
                for {
-                       cronLocker.Lock()
-                       // prepare query to find an appropriate pipeline to 
execute
-                       err := db.First(dbPipeline,
-                               dal.Where("status IN ?", 
[]string{models.TASK_CREATED, models.TASK_RERUN}),
-                               dal.Join(
-                                       `left join _devlake_pipeline_labels ON
-                                               
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
-                                               _devlake_pipeline_labels.name 
LIKE 'parallel/%' AND
-                                               _devlake_pipeline_labels.name 
in ?`,
-                                       runningParallelLabels,
-                               ),
-                               dal.Groupby("id"),
-                               
dal.Having("count(_devlake_pipeline_labels.name)=0"),
-                               dal.Select("id"),
-                               dal.Orderby("id ASC"),
-                               dal.Limit(1),
-                       )
-                       cronLocker.Unlock()
+                       dbPipeline, err = dequeuePipeline(runningParallelLabels)
                        if err == nil {
-                               // next pipeline found
                                break
                        }
-                       if !db.IsErrorNotFound(err) {
-                               // log unexpected err
-                               globalPipelineLog.Error(err, "dequeue failed")
-                       }
                        time.Sleep(time.Second)
                }
 
-               // mark the pipeline running
-               err = db.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
-                       {ColumnName: "status", Value: models.TASK_RUNNING},
-                       {ColumnName: "message", Value: ""},
-                       {ColumnName: "began_at", Value: time.Now()},
-               }, dal.Where("id = ?", dbPipeline.ID))
+               err = fillPipelineDetail(dbPipeline)
                if err != nil {
                        panic(err)
                }
-
                // add pipelineParallelLabels to runningParallelLabels
                var pipelineParallelLabels []string
-               err = fillPipelineDetail(dbPipeline)
-               if err != nil {
-                       panic(err)
-               }
                for _, dbLabel := range dbPipeline.Labels {
                        if strings.HasPrefix(dbLabel, `parallel/`) {
                                pipelineParallelLabels = 
append(pipelineParallelLabels, dbLabel)
@@ -376,8 +391,6 @@ func NotifyExternal(pipelineId uint64) errors.Error {
 // CancelPipeline FIXME ...
 func CancelPipeline(pipelineId uint64) errors.Error {
        // prevent RunPipelineInQueue from consuming pending pipelines
-       cronLocker.Lock()
-       defer cronLocker.Unlock()
        pipeline := &models.Pipeline{}
        err := db.First(pipeline, dal.Where("id = ?", pipelineId))
        if err != nil {
@@ -433,13 +446,20 @@ func getPipelineLogsPath(pipeline *models.Pipeline) 
(string, errors.Error) {
 }
 
 // RerunPipeline would rerun all failed tasks or specified task
-func RerunPipeline(pipelineId uint64, task *models.Task) ([]*models.Task, 
errors.Error) {
+func RerunPipeline(pipelineId uint64, task *models.Task) (tasks 
[]*models.Task, err errors.Error) {
        // prevent pipeline executor from doing anything that might jeopardize 
the integrity
-       cronLocker.Lock()
-       defer cronLocker.Unlock()
+       pipeline := &models.Pipeline{}
+       txHelper := dbhelper.NewTxHelper(basicRes, &err)
+       tx := txHelper.Begin()
+       defer txHelper.End()
+       err = txHelper.LockTablesTimeout(2*time.Second, 
map[string]bool{"_devlake_pipelines": true})
+       if err != nil {
+               err = errors.BadInput.Wrap(err, "failed to lock pipeline table, 
is there any pending pipeline or deletion?")
+               return
+       }
 
        // load the pipeline
-       pipeline, err := GetPipeline(pipelineId)
+       err = tx.First(pipeline, dal.Where("id = ?", pipelineId))
        if err != nil {
                return nil, err
        }
@@ -482,7 +502,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) 
([]*models.Task, errors
        for _, t := range failedTasks {
                // mark previous task failed
                t.Status = models.TASK_FAILED
-               err := db.UpdateColumn(t, "status", models.TASK_FAILED)
+               err := tx.UpdateColumn(t, "status", models.TASK_FAILED)
                if err != nil {
                        return nil, err
                }
@@ -514,7 +534,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) 
([]*models.Task, errors
        }
 
        // mark pipline rerun
-       err = db.UpdateColumn(&models.Pipeline{},
+       err = tx.UpdateColumn(&models.Pipeline{},
                "status", models.TASK_RERUN,
                dal.Where("id = ?", pipelineId),
        )
diff --git a/backend/server/services/pipeline_helper.go 
b/backend/server/services/pipeline_helper.go
index c0dd11cb6..639bed861 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -20,51 +20,39 @@ package services
 import (
        "encoding/json"
        "fmt"
+       "time"
 
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
+       "github.com/apache/incubator-devlake/helpers/dbhelper"
 )
 
 // CreateDbPipeline returns a NewPipeline
-func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.Pipeline, 
errors.Error) {
-       cronLocker.Lock()
-       defer cronLocker.Unlock()
+func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline 
*models.Pipeline, err errors.Error) {
+       pipeline = &models.Pipeline{}
+       txHelper := dbhelper.NewTxHelper(basicRes, &err)
+       defer txHelper.End()
+       tx := txHelper.Begin()
+       errors.Must(txHelper.LockTablesTimeout(2*time.Second, map[string]bool{
+               "_devlake_pipelines":       true,
+               "_devlake_pipeline_labels": true,
+       }))
+       if err != nil {
+               err = errors.BadInput.Wrap(err, "failed to lock pipeline table, 
is there any pending pipeline or deletion?")
+               return
+       }
        if newPipeline.BlueprintId > 0 {
-               clauses := []dal.Clause{
+               count := errors.Must1(tx.Count(
                        dal.From(&models.Pipeline{}),
                        dal.Where("blueprint_id = ? AND status IN ?", 
newPipeline.BlueprintId, models.PendingTaskStatus),
-               }
-               count, err := db.Count(clauses...)
-               if err != nil {
-                       return nil, errors.Default.Wrap(err, "query pipelines 
error")
-               }
+               ))
                // some pipeline is ruunning , get the detail and output them.
                if count > 0 {
-                       cursor, err := db.Cursor(clauses...)
-                       if err != nil {
-                               return nil, errors.Default.Wrap(err, 
fmt.Sprintf("query pipelines error but count it success. count:%d", count))
-                       }
-                       defer cursor.Close()
-                       fetched := 0
-                       errstr := ""
-                       for cursor.Next() {
-                               pipeline := &models.Pipeline{}
-                               err = db.Fetch(cursor, pipeline)
-                               if err != nil {
-                                       return nil, errors.Default.Wrap(err, 
fmt.Sprintf("failed to Fetch pipelines fetched:[%d],count:[%d]", fetched, 
count))
-                               }
-                               fetched++
-
-                               errstr += fmt.Sprintf("pipeline:[%d] on 
state:[%s] Pending it\r\n", pipeline.ID, pipeline.Status)
-                       }
-                       return nil, errors.Default.New(fmt.Sprintf("the 
blueprint is running fetched:[%d],count:[%d]:\r\n%s", fetched, count, errstr))
+                       return nil, errors.BadInput.New("there are pending 
pipelines of current blueprint already")
                }
        }
-       planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))
-       if err != nil {
-               return nil, err
-       }
+       planByte := errors.Must1(json.Marshal(newPipeline.Plan))
        // create pipeline object from posted data
        dbPipeline := &models.Pipeline{
                Name:          newPipeline.Name,
@@ -80,11 +68,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.Pipeline, errors
        }
 
        // save pipeline to database
-       if err := db.Create(&dbPipeline); err != nil {
-               globalPipelineLog.Error(err, "create pipeline failed: %v", err)
-               return nil, errors.Internal.Wrap(err, "create pipeline failed")
-       }
-
+       errors.Must(tx.Create(dbPipeline))
        labels := make([]models.DbPipelineLabel, 0)
        for _, label := range newPipeline.Labels {
                labels = append(labels, models.DbPipelineLabel{
@@ -93,10 +77,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.Pipeline, errors
                })
        }
        if len(newPipeline.Labels) > 0 {
-               if err := db.Create(&labels); err != nil {
-                       globalPipelineLog.Error(err, "create pipeline's 
labelModels failed: %v", err)
-                       return nil, errors.Internal.Wrap(err, "create 
pipeline's labelModels failed")
-               }
+               errors.Must(tx.Create(&labels))
        }
 
        // create tasks accordingly
@@ -110,28 +91,18 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.Pipeline, errors
                                PipelineRow:  i + 1,
                                PipelineCol:  j + 1,
                        }
-                       _, err := CreateTask(newTask)
-                       if err != nil {
-                               globalPipelineLog.Error(err, "create task for 
pipeline failed: %v", err)
-                               return nil, err
-                       }
+                       _ = errors.Must1(CreateTask(newTask))
                        // sync task state back to pipeline
                        dbPipeline.TotalTasks += 1
                }
        }
-       if err != nil {
-               globalPipelineLog.Error(err, "save tasks for pipeline failed: 
%v", err)
-               return nil, errors.Internal.Wrap(err, "save tasks for pipeline 
failed")
-       }
        if dbPipeline.TotalTasks == 0 {
-               return nil, errors.Internal.New("no task to run")
+               err = errors.BadInput.New("no task to run")
+               return
        }
 
        // update tasks state
-       if err := db.Update(dbPipeline); err != nil {
-               globalPipelineLog.Error(err, "update pipline state failed: %v", 
err)
-               return nil, errors.Internal.Wrap(err, "update pipline state 
failed")
-       }
+       errors.Must(tx.Update(dbPipeline))
        dbPipeline.Labels = newPipeline.Labels
        return dbPipeline, nil
 }

Reply via email to