This is an automated email from the ASF dual-hosted git repository.

ka94 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new f10947616 refactor: better readibility for LockTables (#5662)
f10947616 is described below

commit f109476168a32a1ea15b8591f436290f7ab64b07
Author: Klesh Wong <[email protected]>
AuthorDate: Thu Jul 13 00:43:28 2023 +0800

    refactor: better readibility for LockTables (#5662)
---
 backend/core/dal/dal.go                              | 20 +++++++++++++++++++-
 backend/helpers/dbhelper/txhelper.go                 |  6 +++---
 .../helpers/pluginhelper/api/scope_generic_helper.go |  2 +-
 backend/impls/dalgorm/dalgorm_transaction.go         | 14 +++++++-------
 backend/server/services/locking.go                   |  2 +-
 backend/server/services/pipeline.go                  | 12 +++++-------
 backend/server/services/pipeline_helper.go           | 10 ++++++----
 7 files changed, 42 insertions(+), 24 deletions(-)

diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index c4bddd1d4..4a5b97b62 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -183,12 +183,30 @@ type Dal interface {
        RawCursor(query string, params ...interface{}) (*sql.Rows, errors.Error)
 }
 
+type LockTable struct {
+       Table     string
+       Tabler    Tabler
+       Exclusive bool
+}
+
+func (l *LockTable) TableName() string {
+       if l.Table != "" {
+               return l.Table
+       }
+       if l.Tabler != nil {
+               return l.Tabler.TableName()
+       }
+       panic("either Table or Tabler must be specified")
+}
+
+type LockTables []*LockTable
+
 type Transaction interface {
        Dal
        Rollback() errors.Error
        Commit() errors.Error
        // table: exclusive
-       LockTables(tables map[string]bool) errors.Error
+       LockTables(lockTables LockTables) errors.Error
        UnlockTables() errors.Error
        // End(err *errors.Error)
 }
diff --git a/backend/helpers/dbhelper/txhelper.go 
b/backend/helpers/dbhelper/txhelper.go
index 307399abd..dc3c7e125 100644
--- a/backend/helpers/dbhelper/txhelper.go
+++ b/backend/helpers/dbhelper/txhelper.go
@@ -44,11 +44,11 @@ func (l *TxHelper[E]) Begin() dal.Transaction {
 }
 
 // LockTablesTimeout locks tables with timeout
-func (l *TxHelper[E]) LockTablesTimeout(timeout time.Duration, tables 
map[string]bool) errors.Error {
+func (l *TxHelper[E]) LockTablesTimeout(timeout time.Duration, lockTables 
dal.LockTables) errors.Error {
        println("timeout", timeout)
        c := make(chan errors.Error, 1)
        go func() {
-               c <- l.tx.LockTables(tables)
+               c <- l.tx.LockTables(lockTables)
                close(c)
        }()
 
@@ -58,7 +58,7 @@ func (l *TxHelper[E]) LockTablesTimeout(timeout 
time.Duration, tables map[string
                        panic(err)
                }
        case <-time.After(timeout):
-               return errors.Timeout.New("lock tables timeout: " + 
fmt.Sprintf("%v", tables))
+               return errors.Timeout.New("lock tables timeout: " + 
fmt.Sprintf("%v", lockTables))
        }
        return nil
 }
diff --git a/backend/helpers/pluginhelper/api/scope_generic_helper.go 
b/backend/helpers/pluginhelper/api/scope_generic_helper.go
index 2edfc3dad..ba0d90b22 100644
--- a/backend/helpers/pluginhelper/api/scope_generic_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_generic_helper.go
@@ -301,7 +301,7 @@ func (gs *GenericScopeApiHelper[Conn, Scope, ScopeConfig]) 
DeleteScope(input *pl
        txHelper := dbhelper.NewTxHelper(gs.basicRes, &err)
        defer txHelper.End()
        tx := txHelper.Begin()
-       err = txHelper.LockTablesTimeout(2*time.Second, 
map[string]bool{"_devlake_pipelines": false})
+       err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table: 
"_devlake_pipelines"}})
        if err != nil {
                err = errors.Conflict.Wrap(err, "This data scope cannot be 
deleted due to a table lock error. There might be running pipeline(s) or other 
deletion operations in progress.")
                return
diff --git a/backend/impls/dalgorm/dalgorm_transaction.go 
b/backend/impls/dalgorm/dalgorm_transaction.go
index 6426a1053..ed839e06a 100644
--- a/backend/impls/dalgorm/dalgorm_transaction.go
+++ b/backend/impls/dalgorm/dalgorm_transaction.go
@@ -50,17 +50,17 @@ func (t *DalgormTransaction) Commit() errors.Error {
        return nil
 }
 
-func (t *DalgormTransaction) LockTables(tables map[string]bool) errors.Error {
+func (t *DalgormTransaction) LockTables(lockTables dal.LockTables) 
errors.Error {
        switch t.Dialect() {
        case "mysql":
                // mysql lock all tables at once, each lock would release all 
previous locks
                clause := ""
-               for table, exclusive := range tables {
+               for _, lockTable := range lockTables {
                        if clause != "" {
                                clause += ", "
                        }
-                       clause += table
-                       if exclusive {
+                       clause += lockTable.TableName()
+                       if lockTable.Exclusive {
                                clause += " WRITE"
                        } else {
                                clause += " READ"
@@ -69,12 +69,12 @@ func (t *DalgormTransaction) LockTables(tables 
map[string]bool) errors.Error {
                return t.Exec(fmt.Sprintf("LOCK TABLES %s", clause))
        case "postgres":
                clause := ""
-               for table, exclusive := range tables {
+               for _, lockTable := range lockTables {
                        if clause != "" {
                                clause += ", "
                        }
-                       clause += table
-                       if exclusive {
+                       clause += lockTable.TableName()
+                       if lockTable.Exclusive {
                                clause += "  IN EXCLUSIVE MODE"
                        } else {
                                clause += "  IN SHARE MODE"
diff --git a/backend/server/services/locking.go 
b/backend/server/services/locking.go
index 3c6fe8b7a..6a74ad4da 100644
--- a/backend/server/services/locking.go
+++ b/backend/server/services/locking.go
@@ -50,7 +50,7 @@ func lockDatabase() {
        c := make(chan bool, 1)
        go func() {
                errors.Must(lockingTx.AutoMigrate(&models.LockingStub{}))
-               
errors.Must(lockingTx.LockTables(map[string]bool{"_devlake_locking_stub": 
true}))
+               errors.Must(lockingTx.LockTables(dal.LockTables{{Table: 
"_devlake_locking_stub", Exclusive: true}}))
                lockingHistory.Succeeded = true
                errors.Must(db.Update(lockingHistory))
                c <- true
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index fd1b3e81b..530837e3e 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -173,9 +173,9 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
        defer txHelper.End()
        tx := txHelper.Begin()
        // mysql read lock, not sure if it works for postgresql
-       errors.Must(tx.LockTables(map[string]bool{
-               "_devlake_pipelines":       false,
-               "_devlake_pipeline_labels": false,
+       errors.Must(tx.LockTables(dal.LockTables{
+               {Table: "_devlake_pipelines", Exclusive: false},
+               {Table: "_devlake_pipeline_labels", Exclusive: false},
        }))
        // prepare query to find an appropriate pipeline to execute
        err = tx.First(pipeline,
@@ -195,9 +195,7 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
        )
        if err == nil {
                // mark the pipeline running, now we want a write lock
-               errors.Must(tx.LockTables(map[string]bool{
-                       "_devlake_pipelines": true,
-               }))
+               errors.Must(tx.LockTables(dal.LockTables{{Table: 
"_devlake_pipelines", Exclusive: true}}))
                err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
                        {ColumnName: "status", Value: models.TASK_RUNNING},
                        {ColumnName: "message", Value: ""},
@@ -451,7 +449,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) 
(tasks []*models.Task,
        txHelper := dbhelper.NewTxHelper(basicRes, &err)
        tx := txHelper.Begin()
        defer txHelper.End()
-       err = txHelper.LockTablesTimeout(2*time.Second, 
map[string]bool{"_devlake_pipelines": true})
+       err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table: 
"_devlake_pipelines", Exclusive: true}})
        if err != nil {
                err = errors.BadInput.Wrap(err, "failed to lock pipeline table, 
is there any pending pipeline or deletion?")
                return
diff --git a/backend/server/services/pipeline_helper.go 
b/backend/server/services/pipeline_helper.go
index 639bed861..3221491dd 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -34,10 +34,12 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(pipeline *models.Pipelin
        txHelper := dbhelper.NewTxHelper(basicRes, &err)
        defer txHelper.End()
        tx := txHelper.Begin()
-       errors.Must(txHelper.LockTablesTimeout(2*time.Second, map[string]bool{
-               "_devlake_pipelines":       true,
-               "_devlake_pipeline_labels": true,
-       }))
+       errors.Must(txHelper.LockTablesTimeout(2*time.Second,
+               dal.LockTables{
+                       {Table: "_devlake_pipelines", Exclusive: true},
+                       {Table: "_devlake_pipeline_labels", Exclusive: true},
+               },
+       ))
        if err != nil {
                err = errors.BadInput.Wrap(err, "failed to lock pipeline table, 
is there any pending pipeline or deletion?")
                return

Reply via email to