This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a19d7019cbb [fix](regression) fix some txn_insert and group_commit 
case (#37928)
a19d7019cbb is described below

commit a19d7019cbb86951a60cc6a6666fa9b542fa2abb
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 18 22:19:03 2024 +0800

    [fix](regression) fix some txn_insert and group_commit case (#37928)
    
    ## Proposed changes
    
    1. the `TLoadTxnCommitRequest` used for multi table load should sort
    tables in table id orders, because the publish should get table write
    lock in order to prevent dead lock
    2. When publish a transaction with multi tables, may cost a long time.
    Because there is only 1 publish thread, this will block the other txns.
    
        Here is how it happen:
    * the txn contains 2 tables: t0 and t1, publish get `t0 write lock`, and
    wait for `t1 write lock`
    * at the same time, client `insert into t0 select from t1` concurrently.
    `t1 read lock` is held by nereids planner, and this planner wait for `t0
    read lock` until 1 minute timeout.
    * the `t1 read lock` is held by multi load threads which continuously
    comes from client, so the publish wait a long time for `t1 write lock`.
    
       the fix is that:
    * when publish, we get table write lock with a timeout, if failed, the
    publish will retry in next loop. Make one txn does not block the other
    txns
    * for nereids planner, get table read lock in id order to find lock
    conflict more promptly
---
 .../apache/doris/common/util/MetaLockUtils.java    | 13 +++++
 .../org/apache/doris/nereids/CascadesContext.java  |  5 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  4 ++
 .../doris/transaction/DatabaseTransactionMgr.java  |  6 ++-
 .../test_group_commit_and_wal_back_pressure.out    | 10 ----
 .../org/apache/doris/regression/suite/Suite.groovy |  5 +-
 .../insert_p0/insert_group_commit_into.groovy      | 18 +++----
 .../test_group_commit_data_bytes_property.groovy   | 22 +++++++-
 .../insert_p0/txn_insert_concurrent_insert.groovy  |  2 +-
 .../test_group_commit_and_wal_back_pressure.groovy | 63 ++++++++--------------
 10 files changed, 80 insertions(+), 68 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index 084c6f25f79..16afbcecdae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -74,6 +74,19 @@ public class MetaLockUtils {
         return lockedTablesList;
     }
 
+    public static boolean tryWriteLockTablesIfExist(List<? extends TableIf> 
tableList, long timeout,
+            TimeUnit unit) {
+        for (int i = 0; i < tableList.size(); i++) {
+            if (!tableList.get(i).tryWriteLockIfExist(timeout, unit)) {
+                for (int j = i - 1; j >= 0; j--) {
+                    tableList.get(j).writeUnlock();
+                }
+                return false;
+            }
+        }
+        return true;
+    }
+
     public static void writeLockTablesOrMetaException(List<? extends TableIf> 
tableList) throws MetaNotFoundException {
         for (int i = 0; i < tableList.size(); i++) {
             try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index a0d748c08c7..5c9d2b7a01a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -261,7 +261,8 @@ public class CascadesContext implements ScheduleContext {
     }
 
     public void setTables(List<TableIf> tables) {
-        this.tables = tables.stream().collect(Collectors.toMap(TableIf::getId, 
t -> t, (t1, t2) -> t1));
+        this.tables = tables.stream()
+                .collect(Collectors.toMap(TableIf::getId, t -> t, (t1, t2) -> 
t1, () -> Maps.newTreeMap()));
     }
 
     public final ConnectContext getConnectContext() {
@@ -399,7 +400,7 @@ public class CascadesContext implements ScheduleContext {
      */
     public void extractTables(LogicalPlan logicalPlan) {
         Set<List<String>> tableNames = getTables(logicalPlan);
-        tables = Maps.newHashMap();
+        tables = Maps.newTreeMap();
         for (List<String> tableName : tableNames) {
             try {
                 TableIf table = getTable(tableName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8f34fed5e5f..130d440fcda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -269,6 +269,7 @@ import org.apache.thrift.TException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1416,6 +1417,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             OlapTable table = (OlapTable) db.getTableOrMetaException(tbl, 
TableType.OLAP);
             tables.add(table);
         }
+        if (tables.size() > 1) {
+            tables.sort(Comparator.comparing(Table::getId));
+        }
         // if it has multi table, use multi table and update multi table 
running
         // transaction table ids
         if (CollectionUtils.isNotEmpty(request.getTbls())) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 6b30ed507f7..5944170cb3d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -91,6 +91,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -1095,7 +1096,10 @@ public class DatabaseTransactionMgr {
             LOG.debug("finish transaction {} with tables {}", transactionId, 
tableIdList);
         }
         List<? extends TableIf> tableList = 
db.getTablesOnIdOrderIfExist(tableIdList);
-        tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
+        if (!MetaLockUtils.tryWriteLockTablesIfExist(tableList, 10, 
TimeUnit.SECONDS)) {
+            LOG.warn("finish transaction {} failed, get lock timeout with 
tables {}", transactionId, tableIdList);
+            return;
+        }
         PublishResult publishResult;
         try {
             // add all commit errors and publish errors to a single set
diff --git 
a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out
 
b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out
deleted file mode 100644
index 4b064bec44e..00000000000
--- 
a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out
+++ /dev/null
@@ -1,10 +0,0 @@
--- This file is automatically generated. You should know what you did if you 
want to edit this
--- !1 --
-1
-
--- !2 --
-1
-
--- !3 --
-1
-
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 1685b1e4d46..8397a638c36 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1807,11 +1807,12 @@ class Suite implements GroovyInterceptable {
             def last_end_version = -1
             for(def rowset : rowsets) {
                 def version_str = rowset.substring(1, rowset.indexOf("]"))
-                logger.info("version_str: $version_str")
                 def versions = version_str.split("-")
                 def start_version = versions[0].toLong()
                 def end_version = versions[1].toLong()
-                logger.info("cur_version:[$start_version - $end_version], 
last_version:[$last_start_version - $last_end_version]")
+                if (last_end_version + 1 != start_version) {
+                    logger.warn("last_version:[$last_start_version - 
$last_end_version], cur_version:[$start_version - $end_version], version_str: 
$version_str")
+                }
                 assertEquals(last_end_version + 1, start_version)
                 last_start_version = start_version
                 last_end_version = end_version
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 6476d2736e9..3b4bf78846f 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -16,7 +16,8 @@
 // under the License.
 
 import com.mysql.cj.jdbc.StatementImpl
-import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
 
 suite("insert_group_commit_into") {
     def dbName = "regression_test_insert_p0"
@@ -24,16 +25,13 @@ suite("insert_group_commit_into") {
     def table = dbName + "." + tableName
 
     def getRowCount = { expectedRowCount ->
-        def retry = 0
-        while (retry < 30) {
-            sleep(2000)
-            def rowCount = sql "select count(*) from ${table}"
-            logger.info("rowCount: " + rowCount + ", retry: " + retry)
-            if (rowCount[0][0] >= expectedRowCount) {
-                break
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+            {
+                def result = sql "select count(*) from ${table}"
+                logger.info("table: ${table}, rowCount: ${result}")
+                return result[0][0] == expectedRowCount
             }
-            retry++
-        }
+        )
     }
 
     def getAlterTableState = {
diff --git 
a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy 
b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
index d4f98e13d02..fe603c93ce9 100644
--- 
a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
+++ 
b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
@@ -18,6 +18,9 @@
 import com.mysql.cj.jdbc.StatementImpl
 import org.codehaus.groovy.runtime.IOGroovyMethods
 
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
 suite("test_group_commit_data_bytes_property") {
 
     def dbName = "regression_test_insert_p0"
@@ -88,7 +91,24 @@ suite("test_group_commit_data_bytes_property") {
 
             def msg3 = group_commit_insert """insert into ${test_table} 
values(3,3); """, 1
 
-            def msg4 = group_commit_insert """insert into ${test_table} 
values(4,4); """, 1
+            // add a retry for can not get a block queue because the data 
bytes is too small
+            def msg4 = ""
+            Awaitility.await().atMost(10, SECONDS).until(
+                {
+                    try {
+                        sql """ set group_commit = async_mode; """
+                        msg4 = group_commit_insert """insert into 
${test_table} values(4,4); """, 1
+                        return true
+                    } catch (Exception e) {
+                        logger.info("get exception: ${e.getMessage()}")
+                        if (e.getMessage().contains("can not get a block 
queue")) {
+                            return false
+                        } else {
+                            throw e
+                        }
+                    }
+                }
+            )
 
             assertNotEquals(msg3.substring(msg3.indexOf("group_commit")+11, 
msg3.indexOf("group_commit")+43), 
msg4.substring(msg4.indexOf("group_commit")+11, 
msg4.indexOf("group_commit")+43));
 
diff --git 
a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
index 3e84a9f56e8..c28f9192258 100644
--- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
@@ -109,7 +109,7 @@ suite("txn_insert_concurrent_insert") {
         futures.add(future)
     }
     CompletableFuture<?>[] futuresArray = futures.toArray(new 
CompletableFuture[0])
-    CompletableFuture.allOf(futuresArray).get(2, TimeUnit.MINUTES)
+    CompletableFuture.allOf(futuresArray).get(3, TimeUnit.MINUTES)
     sql """ sync """
 
     def result = sql """ select count() from ${tableName}_0 """
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
index 5ec3cab11d0..bd443b6eab7 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
@@ -15,38 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_group_commit_and_wal_back_pressure") {
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
 
-    def tableName = "test_group_commit_and_wal_back_pressure"
-    sql """ DROP TABLE IF EXISTS ${tableName}1 """
-    sql """
-            CREATE TABLE ${tableName}1 (
-                k bigint,  
-                v string
-                )  
-                UNIQUE KEY(k)  
-                DISTRIBUTED BY HASH (k) BUCKETS 32  
-                PROPERTIES(  
-                "replication_num" = "1"
-            );
-    """
+suite("test_group_commit_and_wal_back_pressure") {
 
-    sql """ DROP TABLE IF EXISTS ${tableName}2 """
-    sql """
-            CREATE TABLE ${tableName}2 (
-                k bigint,  
-                v string
-                )  
-                UNIQUE KEY(k)  
-                DISTRIBUTED BY HASH (k) BUCKETS 32  
-                PROPERTIES(  
-                "replication_num" = "1"
-            );
-    """
+    def getRowCount = { table, expectedRowCount ->
+        Awaitility.await().atMost(90, SECONDS).pollInterval(2, SECONDS).until(
+            {
+                def result = sql "select count(*) from ${table}"
+                logger.info("table: ${table}, rowCount: ${result}")
+                return result[0][0] == expectedRowCount
+            }
+        )
+    }
 
-    sql """ DROP TABLE IF EXISTS ${tableName}3 """
-    sql """
-            CREATE TABLE ${tableName}3 (
+    def tableName = "test_group_commit_and_wal_back_pressure"
+    for (int j = 1; j <= 3; j++) {
+        sql """ DROP TABLE IF EXISTS ${tableName}${j} """
+        sql """
+            CREATE TABLE ${tableName}${j} (
                 k bigint,  
                 v string
                 )  
@@ -55,7 +43,8 @@ suite("test_group_commit_and_wal_back_pressure") {
                 PROPERTIES(  
                 "replication_num" = "1"
             );
-    """
+        """
+    }
 
     def t1 = []
     for (int i = 0; i < 20; i++) {
@@ -122,16 +111,8 @@ suite("test_group_commit_and_wal_back_pressure") {
     for (Thread th in t3) {
         th.join()
     }
-
-    // wait for group commit
-    Thread.sleep(10000)
-
     sql "sync"
-
-    qt_1 """ select count(*) from ${tableName}1;"""
-
-    qt_2 """ select count(*) from ${tableName}2;"""
-
-    qt_3 """ select count(*) from ${tableName}3;"""
-
+    getRowCount "${tableName}1", 1
+    getRowCount "${tableName}2", 1
+    getRowCount "${tableName}3", 1
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to