This is an automated email from the ASF dual-hosted git repository.
ka94 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new f10947616 refactor: better readibility for LockTables (#5662)
f10947616 is described below
commit f109476168a32a1ea15b8591f436290f7ab64b07
Author: Klesh Wong <[email protected]>
AuthorDate: Thu Jul 13 00:43:28 2023 +0800
refactor: better readibility for LockTables (#5662)
---
backend/core/dal/dal.go | 20 +++++++++++++++++++-
backend/helpers/dbhelper/txhelper.go | 6 +++---
.../helpers/pluginhelper/api/scope_generic_helper.go | 2 +-
backend/impls/dalgorm/dalgorm_transaction.go | 14 +++++++-------
backend/server/services/locking.go | 2 +-
backend/server/services/pipeline.go | 12 +++++-------
backend/server/services/pipeline_helper.go | 10 ++++++----
7 files changed, 42 insertions(+), 24 deletions(-)
diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index c4bddd1d4..4a5b97b62 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -183,12 +183,30 @@ type Dal interface {
RawCursor(query string, params ...interface{}) (*sql.Rows, errors.Error)
}
+type LockTable struct {
+ Table string
+ Tabler Tabler
+ Exclusive bool
+}
+
+func (l *LockTable) TableName() string {
+ if l.Table != "" {
+ return l.Table
+ }
+ if l.Tabler != nil {
+ return l.Tabler.TableName()
+ }
+ panic("either Table or Tabler must be specified")
+}
+
+type LockTables []*LockTable
+
type Transaction interface {
Dal
Rollback() errors.Error
Commit() errors.Error
// table: exclusive
- LockTables(tables map[string]bool) errors.Error
+ LockTables(lockTables LockTables) errors.Error
UnlockTables() errors.Error
// End(err *errors.Error)
}
diff --git a/backend/helpers/dbhelper/txhelper.go
b/backend/helpers/dbhelper/txhelper.go
index 307399abd..dc3c7e125 100644
--- a/backend/helpers/dbhelper/txhelper.go
+++ b/backend/helpers/dbhelper/txhelper.go
@@ -44,11 +44,11 @@ func (l *TxHelper[E]) Begin() dal.Transaction {
}
// LockTablesTimeout locks tables with timeout
-func (l *TxHelper[E]) LockTablesTimeout(timeout time.Duration, tables
map[string]bool) errors.Error {
+func (l *TxHelper[E]) LockTablesTimeout(timeout time.Duration, lockTables
dal.LockTables) errors.Error {
println("timeout", timeout)
c := make(chan errors.Error, 1)
go func() {
- c <- l.tx.LockTables(tables)
+ c <- l.tx.LockTables(lockTables)
close(c)
}()
@@ -58,7 +58,7 @@ func (l *TxHelper[E]) LockTablesTimeout(timeout
time.Duration, tables map[string
panic(err)
}
case <-time.After(timeout):
- return errors.Timeout.New("lock tables timeout: " +
fmt.Sprintf("%v", tables))
+ return errors.Timeout.New("lock tables timeout: " +
fmt.Sprintf("%v", lockTables))
}
return nil
}
diff --git a/backend/helpers/pluginhelper/api/scope_generic_helper.go
b/backend/helpers/pluginhelper/api/scope_generic_helper.go
index 2edfc3dad..ba0d90b22 100644
--- a/backend/helpers/pluginhelper/api/scope_generic_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_generic_helper.go
@@ -301,7 +301,7 @@ func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig])
DeleteScope(input *pl
txHelper := dbhelper.NewTxHelper(gs.basicRes, &err)
defer txHelper.End()
tx := txHelper.Begin()
- err = txHelper.LockTablesTimeout(2*time.Second,
map[string]bool{"_devlake_pipelines": false})
+ err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table:
"_devlake_pipelines"}})
if err != nil {
err = errors.Conflict.Wrap(err, "This data scope cannot be
deleted due to a table lock error. There might be running pipeline(s) or other
deletion operations in progress.")
return
diff --git a/backend/impls/dalgorm/dalgorm_transaction.go
b/backend/impls/dalgorm/dalgorm_transaction.go
index 6426a1053..ed839e06a 100644
--- a/backend/impls/dalgorm/dalgorm_transaction.go
+++ b/backend/impls/dalgorm/dalgorm_transaction.go
@@ -50,17 +50,17 @@ func (t *DalgormTransaction) Commit() errors.Error {
return nil
}
-func (t *DalgormTransaction) LockTables(tables map[string]bool) errors.Error {
+func (t *DalgormTransaction) LockTables(lockTables dal.LockTables)
errors.Error {
switch t.Dialect() {
case "mysql":
// mysql lock all tables at once, each lock would release all
previous locks
clause := ""
- for table, exclusive := range tables {
+ for _, lockTable := range lockTables {
if clause != "" {
clause += ", "
}
- clause += table
- if exclusive {
+ clause += lockTable.TableName()
+ if lockTable.Exclusive {
clause += " WRITE"
} else {
clause += " READ"
@@ -69,12 +69,12 @@ func (t *DalgormTransaction) LockTables(tables
map[string]bool) errors.Error {
return t.Exec(fmt.Sprintf("LOCK TABLES %s", clause))
case "postgres":
clause := ""
- for table, exclusive := range tables {
+ for _, lockTable := range lockTables {
if clause != "" {
clause += ", "
}
- clause += table
- if exclusive {
+ clause += lockTable.TableName()
+ if lockTable.Exclusive {
clause += " IN EXCLUSIVE MODE"
} else {
clause += " IN SHARE MODE"
diff --git a/backend/server/services/locking.go
b/backend/server/services/locking.go
index 3c6fe8b7a..6a74ad4da 100644
--- a/backend/server/services/locking.go
+++ b/backend/server/services/locking.go
@@ -50,7 +50,7 @@ func lockDatabase() {
c := make(chan bool, 1)
go func() {
errors.Must(lockingTx.AutoMigrate(&models.LockingStub{}))
-
errors.Must(lockingTx.LockTables(map[string]bool{"_devlake_locking_stub":
true}))
+ errors.Must(lockingTx.LockTables(dal.LockTables{{Table:
"_devlake_locking_stub", Exclusive: true}}))
lockingHistory.Succeeded = true
errors.Must(db.Update(lockingHistory))
c <- true
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index fd1b3e81b..530837e3e 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -173,9 +173,9 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
defer txHelper.End()
tx := txHelper.Begin()
// mysql read lock, not sure if it works for postgresql
- errors.Must(tx.LockTables(map[string]bool{
- "_devlake_pipelines": false,
- "_devlake_pipeline_labels": false,
+ errors.Must(tx.LockTables(dal.LockTables{
+ {Table: "_devlake_pipelines", Exclusive: false},
+ {Table: "_devlake_pipeline_labels", Exclusive: false},
}))
// prepare query to find an appropriate pipeline to execute
err = tx.First(pipeline,
@@ -195,9 +195,7 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
)
if err == nil {
// mark the pipeline running, now we want a write lock
- errors.Must(tx.LockTables(map[string]bool{
- "_devlake_pipelines": true,
- }))
+ errors.Must(tx.LockTables(dal.LockTables{{Table:
"_devlake_pipelines", Exclusive: true}}))
err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
@@ -451,7 +449,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task)
(tasks []*models.Task,
txHelper := dbhelper.NewTxHelper(basicRes, &err)
tx := txHelper.Begin()
defer txHelper.End()
- err = txHelper.LockTablesTimeout(2*time.Second,
map[string]bool{"_devlake_pipelines": true})
+ err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table:
"_devlake_pipelines", Exclusive: true}})
if err != nil {
err = errors.BadInput.Wrap(err, "failed to lock pipeline table,
is there any pending pipeline or deletion?")
return
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index 639bed861..3221491dd 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -34,10 +34,12 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(pipeline *models.Pipelin
txHelper := dbhelper.NewTxHelper(basicRes, &err)
defer txHelper.End()
tx := txHelper.Begin()
- errors.Must(txHelper.LockTablesTimeout(2*time.Second, map[string]bool{
- "_devlake_pipelines": true,
- "_devlake_pipeline_labels": true,
- }))
+ errors.Must(txHelper.LockTablesTimeout(2*time.Second,
+ dal.LockTables{
+ {Table: "_devlake_pipelines", Exclusive: true},
+ {Table: "_devlake_pipeline_labels", Exclusive: true},
+ },
+ ))
if err != nil {
err = errors.BadInput.Wrap(err, "failed to lock pipeline table,
is there any pending pipeline or deletion?")
return