This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e38b46152 feat: pipeline/scope-deletion mutual exclusive (#5609)
e38b46152 is described below
commit e38b4615258398fe3c3f0c4df4400591d08326d1
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Jul 4 14:21:11 2023 +0800
feat: pipeline/scope-deletion mutual exclusive (#5609)
* feat: pipeline/scope-deletion mutual exclusive
* fix: ci e2e-test failed
* fix: too many plugins compile in parallel causing sigkill
* fix: add os dependency
* fix: allow emptying scope even if it is referenced
* fix: debuging line and closing channel
* fix: prevent writing to a closed channel
---
.gitignore | 1 +
backend/Makefile | 8 +-
backend/core/dal/dal.go | 4 +
backend/core/errors/util.go | 13 +++
backend/core/runner/db.go | 2 +-
backend/helpers/dbhelper/txhelper.go | 96 +++++++++++++++++
.../pluginhelper/api/scope_generic_helper.go | 34 ++++--
backend/helpers/pluginhelper/api/scope_helper.go | 7 +-
backend/impls/dalgorm/dalgorm.go | 2 +-
backend/impls/dalgorm/dalgorm_transaction.go | 52 +++++++++
backend/python/DevelopmentSetup.md | 11 ++
backend/scripts/compile-plugins.sh | 8 ++
backend/server/api/shared/api_output.go | 10 +-
backend/server/services/init.go | 22 +---
backend/server/services/locking.go | 54 ++++------
backend/server/services/pipeline.go | 116 ++++++++++++---------
backend/server/services/pipeline_helper.go | 79 +++++---------
17 files changed, 343 insertions(+), 176 deletions(-)
diff --git a/.gitignore b/.gitignore
index 3cdcad23a..a9d2bbe8a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -159,3 +159,4 @@ backend/server/api/docs/docs.go
# python
*.pyc
__pycache__
+venv
\ No newline at end of file
diff --git a/backend/Makefile b/backend/Makefile
index e768d79fb..38a18751a 100644
--- a/backend/Makefile
+++ b/backend/Makefile
@@ -102,7 +102,6 @@ python-unit-test: build-pydevlake
sh ./python/run_tests.sh
e2e-plugins-test:
- exit_code=0;\
export ENV_PATH=$(shell readlink -f .env);\
set -e;\
for m in $$(go list ./plugins/... | egrep 'e2e'); do \
@@ -118,11 +117,12 @@ e2e-test-init:
go run ./test/init.go || exit $$?;\
e2e-test-run:
- exit_code=0;\
for m in $$(go list ./test/e2e/... | grep -v manual); do \
- echo $$m; go test -p 1 -timeout 300s -v $$m || exit_code=$$?; \
+ echo $$m; \
+ if ! go test -p 1 -timeout 300s -v $$m ; then \
+ exit $$?; \
+ fi; \
done; \
- exit $$exit_code
e2e-test: e2e-test-init e2e-test-run
diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index 089c428b9..c4bddd1d4 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -187,6 +187,10 @@ type Transaction interface {
Dal
Rollback() errors.Error
Commit() errors.Error
+ // table: exclusive
+ LockTables(tables map[string]bool) errors.Error
+ UnlockTables() errors.Error
+ // End(err *errors.Error)
}
type Rows interface {
diff --git a/backend/core/errors/util.go b/backend/core/errors/util.go
index 5c6b0559a..01cf382b0 100644
--- a/backend/core/errors/util.go
+++ b/backend/core/errors/util.go
@@ -28,3 +28,16 @@ func Is(err, target error) bool {
func As(err error, target any) bool {
return errors.As(err, &target)
}
+
+func Must(err error) {
+ if err != nil {
+ panic(err)
+ }
+}
+
+func Must1[T any](t T, err error) T {
+ if err != nil {
+ panic(err)
+ }
+ return t
+}
diff --git a/backend/core/runner/db.go b/backend/core/runner/db.go
index bec811059..1e1baba17 100644
--- a/backend/core/runner/db.go
+++ b/backend/core/runner/db.go
@@ -72,7 +72,7 @@ func NewGormDbEx(configReader config.ConfigReader, logger
log.Logger, sessionCon
Colorful: true, //
Disable color
},
),
- PrepareStmt: sessionConfig.PrepareStmt,
+ // PrepareStmt: sessionConfig.PrepareStmt,
SkipDefaultTransaction: sessionConfig.SkipDefaultTransaction,
}
dbUrl := configReader.GetString("DB_URL")
diff --git a/backend/helpers/dbhelper/txhelper.go
b/backend/helpers/dbhelper/txhelper.go
new file mode 100644
index 000000000..307399abd
--- /dev/null
+++ b/backend/helpers/dbhelper/txhelper.go
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dbhelper
+
+import (
+ "fmt"
+ "reflect"
+ "time"
+
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+)
+
+// TxHelper is a helper for transaction management
+type TxHelper[E error] struct {
+ basicRes context.BasicRes
+ perr *E
+ tx dal.Transaction
+}
+
+// Begin starts a transaction
+func (l *TxHelper[E]) Begin() dal.Transaction {
+ if l.tx != nil {
+ panic(fmt.Errorf("Begin has been called"))
+ }
+ l.tx = l.basicRes.GetDal().Begin()
+ return l.tx
+}
+
+// LockTablesTimeout locks tables with timeout
+func (l *TxHelper[E]) LockTablesTimeout(timeout time.Duration, tables
map[string]bool) errors.Error {
+ println("timeout", timeout)
+ c := make(chan errors.Error, 1)
+ go func() {
+ c <- l.tx.LockTables(tables)
+ close(c)
+ }()
+
+ select {
+ case err := <-c:
+ if err != nil {
+ panic(err)
+ }
+ case <-time.After(timeout):
+ return errors.Timeout.New("lock tables timeout: " +
fmt.Sprintf("%v", tables))
+ }
+ return nil
+}
+
+// End ends a transaction and commits it if no error was set or it will try to
rollback and release locked tables
+func (l *TxHelper[E]) End() {
+ if l.tx == nil {
+ panic("Begin was never called")
+ }
+ var msg string
+ err := *l.perr
+ if !reflect.ValueOf(err).IsValid() {
+ r := recover()
+ msg = fmt.Sprintf("%v", r)
+ } else {
+ msg = err.Error()
+ }
+ if msg == "" {
+ errors.Must(l.tx.Commit())
+ }
+ if msg != "" {
+ // l.basicRes.GetLogger().Error(fmt.Errorf(msg), "TxHelper")
+ _ = l.tx.UnlockTables()
+ _ = l.tx.Rollback()
+ }
+ l.tx = nil
+}
+
+// NewTxHelper creates a new TxHelper, the errorPointer is used to detect if
any error was set
+func NewTxHelper[E error](basicRes context.BasicRes, errorPointer *E)
*TxHelper[E] {
+ if errorPointer == nil {
+ panic(fmt.Errorf("errorPointer is required"))
+ }
+ return &TxHelper[E]{basicRes: basicRes, perr: errorPointer}
+}
diff --git a/backend/helpers/pluginhelper/api/scope_generic_helper.go
b/backend/helpers/pluginhelper/api/scope_generic_helper.go
index d6ab4642b..985732f54 100644
--- a/backend/helpers/pluginhelper/api/scope_generic_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_generic_helper.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/models/domainlayer/domaininfo"
"github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/dbhelper"
serviceHelper
"github.com/apache/incubator-devlake/helpers/pluginhelper/services"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
@@ -42,6 +43,7 @@ type NoScopeConfig struct{}
type (
GenericScopeApiHelper[Conn any, Scope plugin.ToolLayerScope,
ScopeConfig any] struct {
+ basicRes context.BasicRes
log log.Logger
db dal.Dal
validator *validator.Validate
@@ -119,6 +121,7 @@ func NewGenericScopeHelper[Conn any, Scope
plugin.ToolLayerScope, ScopeConfig an
opts = &ScopeHelperOptions{}
}
return &GenericScopeApiHelper[Conn, Scope, ScopeConfig]{
+ basicRes: basicRes,
log: basicRes.GetLogger(),
db: basicRes.GetDal(),
validator: vld,
@@ -297,7 +300,24 @@ func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig])
GetScope(input *plugi
return scopeRes, nil
}
-func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig]) DeleteScope(input
*plugin.ApiResourceInput) (*serviceHelper.BlueprintProjectPairs, errors.Error) {
+func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig]) DeleteScope(input
*plugin.ApiResourceInput) (refs *serviceHelper.BlueprintProjectPairs, err
errors.Error) {
+ txHelper := dbhelper.NewTxHelper(gs.basicRes, &err)
+ defer txHelper.End()
+ tx := txHelper.Begin()
+ err = txHelper.LockTablesTimeout(2*time.Second,
map[string]bool{"_devlake_pipelines": false})
+ if err != nil {
+ err = errors.BadInput.Wrap(err, "failed to lock pipeline table,
is there any running pipeline or deletion?")
+ return
+ }
+ count := errors.Must1(tx.Count(
+ dal.From("_devlake_pipelines"),
+ dal.Where("status = ?", models.TASK_RUNNING),
+ ))
+ if count > 0 {
+ err = errors.BadInput.New("Forbid deleting data while there are
pipelines running")
+ return
+ }
+ // time.Sleep(1 * time.Minute) # uncomment this line if you were to
verify pipelines get blocked while deleting data
params, err := gs.extractFromDeleteReqParam(input)
if err != nil {
return nil, err
@@ -310,12 +330,14 @@ func (gs *GenericScopeApiHelper[Conn, Scope,
ScopeConfig]) DeleteScope(input *pl
if err != nil {
return nil, err
}
- // now we can as scope to state its `Params` for data bloodline
identification
- if refs, err := gs.getScopeReferences(params.connectionId,
params.scopeId); err != nil || refs != nil {
- if err != nil {
- return nil, err
+
+ if !params.deleteDataOnly {
+ if refs, err := gs.getScopeReferences(params.connectionId,
params.scopeId); err != nil || refs != nil {
+ if err != nil {
+ return nil, err
+ }
+ return refs, errors.Conflict.New("Found one or more
references to this scope")
}
- return refs, errors.Conflict.New("Found one or more references
to this scope")
}
if err = gs.deleteScopeData(*scope); err != nil {
return nil, err
diff --git a/backend/helpers/pluginhelper/api/scope_helper.go
b/backend/helpers/pluginhelper/api/scope_helper.go
index e2bb6eb87..6474c47fe 100644
--- a/backend/helpers/pluginhelper/api/scope_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_helper.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/server/api/shared"
"github.com/go-playground/validator/v10"
)
@@ -97,7 +98,11 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScope(input
*plugin.ApiResourceInpu
func (c *ScopeApiHelper[Conn, Scope, Tr]) Delete(input
*plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
refs, err := c.DeleteScope(input)
if err != nil {
- return &plugin.ApiResourceOutput{Body: refs, Status:
err.GetType().GetHttpCode()}, nil
+ return &plugin.ApiResourceOutput{Body: &shared.ApiBody{
+ Success: false,
+ Message: err.Error(),
+ Data: refs,
+ }, Status: err.GetType().GetHttpCode()}, nil
}
return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
}
diff --git a/backend/impls/dalgorm/dalgorm.go b/backend/impls/dalgorm/dalgorm.go
index b356b4bf0..5acde4faa 100644
--- a/backend/impls/dalgorm/dalgorm.go
+++ b/backend/impls/dalgorm/dalgorm.go
@@ -436,5 +436,5 @@ func (d *Dalgorm) convertGormError(err error) errors.Error {
if d.IsDuplicationError(err) {
return errors.BadInput.WrapRaw(err)
}
- return errors.Default.WrapRaw(err)
+ panic(err)
}
diff --git a/backend/impls/dalgorm/dalgorm_transaction.go
b/backend/impls/dalgorm/dalgorm_transaction.go
index a6bb95b65..6426a1053 100644
--- a/backend/impls/dalgorm/dalgorm_transaction.go
+++ b/backend/impls/dalgorm/dalgorm_transaction.go
@@ -18,6 +18,8 @@ limitations under the License.
package dalgorm
import (
+ "fmt"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
)
@@ -48,6 +50,56 @@ func (t *DalgormTransaction) Commit() errors.Error {
return nil
}
+func (t *DalgormTransaction) LockTables(tables map[string]bool) errors.Error {
+ switch t.Dialect() {
+ case "mysql":
+ // mysql lock all tables at once, each lock would release all
previous locks
+ clause := ""
+ for table, exclusive := range tables {
+ if clause != "" {
+ clause += ", "
+ }
+ clause += table
+ if exclusive {
+ clause += " WRITE"
+ } else {
+ clause += " READ"
+ }
+ }
+ return t.Exec(fmt.Sprintf("LOCK TABLES %s", clause))
+ case "postgres":
+ clause := ""
+ for table, exclusive := range tables {
+ if clause != "" {
+ clause += ", "
+ }
+ clause += table
+ if exclusive {
+ clause += " IN EXCLUSIVE MODE"
+ } else {
+ clause += " IN SHARE MODE"
+ }
+ }
+ return t.Exec(fmt.Sprintf("LOCK TABLE %s", clause))
+ default:
+ panic(fmt.Errorf("unknown dialect %s", t.Dialect()))
+ }
+}
+
+func (t *DalgormTransaction) UnlockTables() errors.Error {
+ switch t.Dialect() {
+ case "mysql":
+ // mysql would not release lock automatically on Rollback
+ // according to
https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html
+ return t.Exec("UNLOCK TABLES")
+ case "postgres":
+ // pg has no unlock tables
+ return nil
+ default:
+ panic(fmt.Errorf("unknown dialect %s", t.Dialect()))
+ }
+}
+
func newTransaction(dalgorm *Dalgorm) *DalgormTransaction {
return &DalgormTransaction{
Dalgorm: NewDalgorm(dalgorm.db.Begin()),
diff --git a/backend/python/DevelopmentSetup.md
b/backend/python/DevelopmentSetup.md
new file mode 100644
index 000000000..86a91887e
--- /dev/null
+++ b/backend/python/DevelopmentSetup.md
@@ -0,0 +1,11 @@
+
+# For `make e2e-test` to run properly, the following steps must be taken:
+
+1. The following packages are required for Ubuntu: `libffi-dev
default-libmysqlclient-dev libpq-dev`
+2. `python3.9` is required by the time of this document.
+ - Try `deadsnakes` if you are using Ubuntu 22.04 or above, the
`python3.9-dev` is required.
+ - Use `virtualenv` if you are having multiple python versions. `virtualenv
-p python3.9 path/to/venv` and `source path/to/venv/bin/activate.sh` should do
the trick
+3. [poetry](https://python-poetry.org/) is required.
+ - run `cd backend/python/pydevlake && poetry install`
+ - run `cd backend/python/plugins/azuredevops && poetry install`
+4. `sqlalchemy` won't work with `localhost` in the database connection string,
use `127.0.0.1` instead
diff --git a/backend/scripts/compile-plugins.sh
b/backend/scripts/compile-plugins.sh
index ad82cf6a8..7fad2bdf8 100644
--- a/backend/scripts/compile-plugins.sh
+++ b/backend/scripts/compile-plugins.sh
@@ -55,6 +55,14 @@ for PLUG in $PLUGINS; do
echo "Building plugin $NAME to bin/plugins/$NAME/$NAME.so"
go build -buildmode=plugin "$@" -o $PLUGIN_OUTPUT_DIR/$NAME/$NAME.so
$PLUG/*.go &
PIDS="$PIDS $!"
+ # avoid too many processes causing signal killed
+ COUNT=$(echo "$PIDS" | wc -w)
+ if [ "$COUNT" -ge "$(nproc)" ]; then
+ for PID in $PIDS; do
+ wait $PID
+ done
+ PIDS=""
+ fi
done
for PID in $PIDS; do
diff --git a/backend/server/api/shared/api_output.go
b/backend/server/api/shared/api_output.go
index 5c5c776bd..086aaab9a 100644
--- a/backend/server/api/shared/api_output.go
+++ b/backend/server/api/shared/api_output.go
@@ -19,10 +19,11 @@ package shared
import (
"fmt"
+ "net/http"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/impls/logruslog"
- "net/http"
"github.com/gin-gonic/gin"
)
@@ -30,9 +31,10 @@ import (
const BadRequestBody = "bad request body format"
type ApiBody struct {
- Success bool `json:"success"`
- Message string `json:"message"`
- Causes []string `json:"causes"`
+ Success bool `json:"success"`
+ Message string `json:"message"`
+ Causes []string `json:"causes"`
+ Data interface{} `json:"data"`
}
type ResponsePipelines struct {
diff --git a/backend/server/services/init.go b/backend/server/services/init.go
index f3f4e4e72..2e27a027e 100644
--- a/backend/server/services/init.go
+++ b/backend/server/services/init.go
@@ -18,7 +18,6 @@ limitations under the License.
package services
import (
- "sync"
"time"
"github.com/apache/incubator-devlake/core/config"
@@ -30,8 +29,6 @@ import (
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/runner"
"github.com/apache/incubator-devlake/helpers/pluginhelper/services"
- "github.com/apache/incubator-devlake/impls/dalgorm"
- "github.com/apache/incubator-devlake/impls/logruslog"
"github.com/go-playground/validator/v10"
"github.com/robfig/cron/v3"
)
@@ -44,7 +41,6 @@ var bpManager *services.BlueprintManager
var basicRes context.BasicRes
var migrator plugin.Migrator
var cronManager *cron.Cron
-var cronLocker sync.Mutex
var vld *validator.Validate
const failToCreateCronJob = "created cron job failed"
@@ -84,7 +80,7 @@ func Init() {
InitResources()
// lock the database to avoid multiple devlake instances from sharing
the same one
- lockDb()
+ lockDatabase()
var err error
// now, load the plugins
@@ -136,19 +132,3 @@ func ExecuteMigration() errors.Error {
func MigrationRequireConfirmation() bool {
return migrator.HasPendingScripts()
}
-
-func lockDb() {
- // gorm doesn't support creating a PrepareStmt=false session from a
PrepareStmt=true
- // but the lockDatabase needs PrepareStmt=false for table locking, we
have to deal with it here
- lockingDb, err := runner.NewGormDbEx(cfg,
logruslog.Global.Nested("migrator db"), &dal.SessionConfig{
- PrepareStmt: false,
- SkipDefaultTransaction: true,
- })
- if err != nil {
- panic(err)
- }
- err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
- if err != nil {
- panic(err)
- }
-}
diff --git a/backend/server/services/locking.go
b/backend/server/services/locking.go
index a84bcf077..3c6fe8b7a 100644
--- a/backend/server/services/locking.go
+++ b/backend/server/services/locking.go
@@ -18,6 +18,7 @@ limitations under the License.
package services
import (
+ "fmt"
"os"
"time"
@@ -32,52 +33,33 @@ var lockingTx dal.Transaction
// lockDatabase prevents multiple devlake instances from sharing the same
lockDatabase
// check the models.LockingHistory for the detail
-func lockDatabase(db dal.Dal) errors.Error {
+func lockDatabase() {
+ db := basicRes.GetDal()
// first, register the instance
- err := db.AutoMigrate(&models.LockingHistory{})
- if err != nil {
- return err
- }
- hostName, e := os.Hostname()
- if e != nil {
- return errors.Convert(e)
- }
+ errors.Must(db.AutoMigrate(&models.LockingHistory{}))
+ hostName := errors.Must1(os.Hostname())
lockingHistory := &models.LockingHistory{
HostName: hostName,
Version: version.Version,
}
- err = db.Create(lockingHistory)
- if err != nil {
- return err
- }
- // 2. obtain the lock
- err = db.AutoMigrate(&models.LockingStub{})
- if err != nil {
- return err
- }
- lockingTx = db.Begin()
- c := make(chan error, 1)
-
+ errors.Must(db.Create(lockingHistory))
+ // 2. obtain the lock: using a never released transaction
// This prevent multiple devlake instances from sharing the same
database by locking the migration history table
// However, it would not work if any older devlake instances were
already using the database.
+ lockingTx = db.Begin()
+ c := make(chan bool, 1)
go func() {
- switch db.Dialect() {
- case "mysql":
- c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub
WRITE")
- case "postgres":
- c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub
IN EXCLUSIVE MODE")
- }
+ errors.Must(lockingTx.AutoMigrate(&models.LockingStub{}))
+
errors.Must(lockingTx.LockTables(map[string]bool{"_devlake_locking_stub":
true}))
+ lockingHistory.Succeeded = true
+ errors.Must(db.Update(lockingHistory))
+ c <- true
}()
+ // 3. update the record
select {
- case err := <-c:
- if err != nil {
- return errors.Convert(err)
- }
- case <-time.After(2 * time.Second):
- return errors.Default.New("locking _devlake_locking_stub
timeout, the database might be locked by another devlake instance")
+ case <-c:
+ case <-time.After(3 * time.Second):
+ panic(fmt.Errorf("locking _devlake_locking_stub timeout, the
database might be locked by another devlake instance"))
}
- // 3. update the record
- lockingHistory.Succeeded = true
- return db.Update(lockingHistory)
}
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 73c3dbd0b..b451cf4dc 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
+ "github.com/apache/incubator-devlake/helpers/dbhelper"
"github.com/apache/incubator-devlake/impls/logruslog"
"github.com/google/uuid"
v11 "go.temporal.io/api/enums/v1"
@@ -166,67 +167,81 @@ func GetPipelineLogsArchivePath(pipeline
*models.Pipeline) (string, errors.Error
return archive, err
}
+func dequeuePipeline(runningParallelLabels []string) (pipeline
*models.Pipeline, err errors.Error) {
+ pipeline = &models.Pipeline{}
+ txHelper := dbhelper.NewTxHelper(basicRes, &err)
+ defer txHelper.End()
+ tx := txHelper.Begin()
+ // mysql read lock, not sure if it works for postgresql
+ globalPipelineLog.Debug("acquire lock")
+ errors.Must(tx.LockTables(map[string]bool{
+ "_devlake_pipelines": false,
+ "_devlake_pipeline_labels": false,
+ }))
+ // prepare query to find an appropriate pipeline to execute
+ err = tx.First(pipeline,
+ dal.Where("status IN ?", []string{models.TASK_CREATED,
models.TASK_RERUN}),
+ dal.Join(
+ `left join _devlake_pipeline_labels ON
+ _devlake_pipeline_labels.pipeline_id =
_devlake_pipelines.id AND
+ _devlake_pipeline_labels.name LIKE 'parallel/%'
AND
+ _devlake_pipeline_labels.name in ?`,
+ runningParallelLabels,
+ ),
+ dal.Groupby("id"),
+ dal.Having("count(_devlake_pipeline_labels.name)=0"),
+ dal.Select("id"),
+ dal.Orderby("id ASC"),
+ dal.Limit(1),
+ )
+ if err == nil {
+ // mark the pipeline running, now we want a write lock
+ errors.Must(tx.LockTables(map[string]bool{
+ "_devlake_pipelines": true,
+ }))
+ err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
+ {ColumnName: "status", Value: models.TASK_RUNNING},
+ {ColumnName: "message", Value: ""},
+ {ColumnName: "began_at", Value: time.Now()},
+ }, dal.Where("id = ?", pipeline.ID))
+ if err != nil {
+ panic(err)
+ }
+ return
+ }
+ if !tx.IsErrorNotFound(err) {
+ // log unexpected err
+ globalPipelineLog.Error(err, "dequeue failed")
+ }
+ return
+}
+
// RunPipelineInQueue query pipeline from db and run it in a queue
func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
runningParallelLabels := []string{}
var runningParallelLabelLock sync.Mutex
+ var err error
for {
- globalPipelineLog.Info("acquire lock")
// start goroutine when sema lock ready and pipeline exist.
// to avoid read old pipeline, acquire lock before read exist
pipeline
- err := sema.Acquire(context.TODO(), 1)
- if err != nil {
- panic(err)
- }
+ errors.Must(sema.Acquire(context.TODO(), 1))
globalPipelineLog.Info("get lock and wait next pipeline")
- dbPipeline := &models.Pipeline{}
+ var dbPipeline *models.Pipeline
for {
- cronLocker.Lock()
- // prepare query to find an appropriate pipeline to
execute
- err := db.First(dbPipeline,
- dal.Where("status IN ?",
[]string{models.TASK_CREATED, models.TASK_RERUN}),
- dal.Join(
- `left join _devlake_pipeline_labels ON
-
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
- _devlake_pipeline_labels.name
LIKE 'parallel/%' AND
- _devlake_pipeline_labels.name
in ?`,
- runningParallelLabels,
- ),
- dal.Groupby("id"),
-
dal.Having("count(_devlake_pipeline_labels.name)=0"),
- dal.Select("id"),
- dal.Orderby("id ASC"),
- dal.Limit(1),
- )
- cronLocker.Unlock()
+ dbPipeline, err = dequeuePipeline(runningParallelLabels)
if err == nil {
- // next pipeline found
break
}
- if !db.IsErrorNotFound(err) {
- // log unexpected err
- globalPipelineLog.Error(err, "dequeue failed")
- }
time.Sleep(time.Second)
}
- // mark the pipeline running
- err = db.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
- {ColumnName: "status", Value: models.TASK_RUNNING},
- {ColumnName: "message", Value: ""},
- {ColumnName: "began_at", Value: time.Now()},
- }, dal.Where("id = ?", dbPipeline.ID))
+ err = fillPipelineDetail(dbPipeline)
if err != nil {
panic(err)
}
-
// add pipelineParallelLabels to runningParallelLabels
var pipelineParallelLabels []string
- err = fillPipelineDetail(dbPipeline)
- if err != nil {
- panic(err)
- }
for _, dbLabel := range dbPipeline.Labels {
if strings.HasPrefix(dbLabel, `parallel/`) {
pipelineParallelLabels =
append(pipelineParallelLabels, dbLabel)
@@ -376,8 +391,6 @@ func NotifyExternal(pipelineId uint64) errors.Error {
// CancelPipeline FIXME ...
func CancelPipeline(pipelineId uint64) errors.Error {
// prevent RunPipelineInQueue from consuming pending pipelines
- cronLocker.Lock()
- defer cronLocker.Unlock()
pipeline := &models.Pipeline{}
err := db.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
@@ -433,13 +446,20 @@ func getPipelineLogsPath(pipeline *models.Pipeline)
(string, errors.Error) {
}
// RerunPipeline would rerun all failed tasks or specified task
-func RerunPipeline(pipelineId uint64, task *models.Task) ([]*models.Task,
errors.Error) {
+func RerunPipeline(pipelineId uint64, task *models.Task) (tasks
[]*models.Task, err errors.Error) {
// prevent pipeline executor from doing anything that might jeopardize
the integrity
- cronLocker.Lock()
- defer cronLocker.Unlock()
+ pipeline := &models.Pipeline{}
+ txHelper := dbhelper.NewTxHelper(basicRes, &err)
+ tx := txHelper.Begin()
+ defer txHelper.End()
+ err = txHelper.LockTablesTimeout(2*time.Second,
map[string]bool{"_devlake_pipelines": true})
+ if err != nil {
+ err = errors.BadInput.Wrap(err, "failed to lock pipeline table,
is there any pending pipeline or deletion?")
+ return
+ }
// load the pipeline
- pipeline, err := GetPipeline(pipelineId)
+ err = tx.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return nil, err
}
@@ -482,7 +502,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task)
([]*models.Task, errors
for _, t := range failedTasks {
// mark previous task failed
t.Status = models.TASK_FAILED
- err := db.UpdateColumn(t, "status", models.TASK_FAILED)
+ err := tx.UpdateColumn(t, "status", models.TASK_FAILED)
if err != nil {
return nil, err
}
@@ -514,7 +534,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task)
([]*models.Task, errors
}
// mark pipline rerun
- err = db.UpdateColumn(&models.Pipeline{},
+ err = tx.UpdateColumn(&models.Pipeline{},
"status", models.TASK_RERUN,
dal.Where("id = ?", pipelineId),
)
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index c0dd11cb6..639bed861 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -20,51 +20,39 @@ package services
import (
"encoding/json"
"fmt"
+ "time"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
+ "github.com/apache/incubator-devlake/helpers/dbhelper"
)
// CreateDbPipeline returns a NewPipeline
-func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.Pipeline,
errors.Error) {
- cronLocker.Lock()
- defer cronLocker.Unlock()
+func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline
*models.Pipeline, err errors.Error) {
+ pipeline = &models.Pipeline{}
+ txHelper := dbhelper.NewTxHelper(basicRes, &err)
+ defer txHelper.End()
+ tx := txHelper.Begin()
+ errors.Must(txHelper.LockTablesTimeout(2*time.Second, map[string]bool{
+ "_devlake_pipelines": true,
+ "_devlake_pipeline_labels": true,
+ }))
+ if err != nil {
+ err = errors.BadInput.Wrap(err, "failed to lock pipeline table,
is there any pending pipeline or deletion?")
+ return
+ }
if newPipeline.BlueprintId > 0 {
- clauses := []dal.Clause{
+ count := errors.Must1(tx.Count(
dal.From(&models.Pipeline{}),
dal.Where("blueprint_id = ? AND status IN ?",
newPipeline.BlueprintId, models.PendingTaskStatus),
- }
- count, err := db.Count(clauses...)
- if err != nil {
- return nil, errors.Default.Wrap(err, "query pipelines
error")
- }
+ ))
// some pipeline is ruunning , get the detail and output them.
if count > 0 {
- cursor, err := db.Cursor(clauses...)
- if err != nil {
- return nil, errors.Default.Wrap(err,
fmt.Sprintf("query pipelines error but count it success. count:%d", count))
- }
- defer cursor.Close()
- fetched := 0
- errstr := ""
- for cursor.Next() {
- pipeline := &models.Pipeline{}
- err = db.Fetch(cursor, pipeline)
- if err != nil {
- return nil, errors.Default.Wrap(err,
fmt.Sprintf("failed to Fetch pipelines fetched:[%d],count:[%d]", fetched,
count))
- }
- fetched++
-
- errstr += fmt.Sprintf("pipeline:[%d] on
state:[%s] Pending it\r\n", pipeline.ID, pipeline.Status)
- }
- return nil, errors.Default.New(fmt.Sprintf("the
blueprint is running fetched:[%d],count:[%d]:\r\n%s", fetched, count, errstr))
+ return nil, errors.BadInput.New("there are pending
pipelines of current blueprint already")
}
}
- planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))
- if err != nil {
- return nil, err
- }
+ planByte := errors.Must1(json.Marshal(newPipeline.Plan))
// create pipeline object from posted data
dbPipeline := &models.Pipeline{
Name: newPipeline.Name,
@@ -80,11 +68,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.Pipeline, errors
}
// save pipeline to database
- if err := db.Create(&dbPipeline); err != nil {
- globalPipelineLog.Error(err, "create pipeline failed: %v", err)
- return nil, errors.Internal.Wrap(err, "create pipeline failed")
- }
-
+ errors.Must(tx.Create(dbPipeline))
labels := make([]models.DbPipelineLabel, 0)
for _, label := range newPipeline.Labels {
labels = append(labels, models.DbPipelineLabel{
@@ -93,10 +77,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.Pipeline, errors
})
}
if len(newPipeline.Labels) > 0 {
- if err := db.Create(&labels); err != nil {
- globalPipelineLog.Error(err, "create pipeline's
labelModels failed: %v", err)
- return nil, errors.Internal.Wrap(err, "create
pipeline's labelModels failed")
- }
+ errors.Must(tx.Create(&labels))
}
// create tasks accordingly
@@ -110,28 +91,18 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.Pipeline, errors
PipelineRow: i + 1,
PipelineCol: j + 1,
}
- _, err := CreateTask(newTask)
- if err != nil {
- globalPipelineLog.Error(err, "create task for
pipeline failed: %v", err)
- return nil, err
- }
+ _ = errors.Must1(CreateTask(newTask))
// sync task state back to pipeline
dbPipeline.TotalTasks += 1
}
}
- if err != nil {
- globalPipelineLog.Error(err, "save tasks for pipeline failed:
%v", err)
- return nil, errors.Internal.Wrap(err, "save tasks for pipeline
failed")
- }
if dbPipeline.TotalTasks == 0 {
- return nil, errors.Internal.New("no task to run")
+ err = errors.BadInput.New("no task to run")
+ return
}
// update tasks state
- if err := db.Update(dbPipeline); err != nil {
- globalPipelineLog.Error(err, "update pipline state failed: %v",
err)
- return nil, errors.Internal.Wrap(err, "update pipline state
failed")
- }
+ errors.Must(tx.Update(dbPipeline))
dbPipeline.Labels = newPipeline.Labels
return dbPipeline, nil
}