This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a19d7019cbb [fix](regression) fix some txn_insert and group_commit
case (#37928)
a19d7019cbb is described below
commit a19d7019cbb86951a60cc6a6666fa9b542fa2abb
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 18 22:19:03 2024 +0800
[fix](regression) fix some txn_insert and group_commit case (#37928)
## Proposed changes
1. the `TLoadTxnCommitRequest` used for multi table load should sort
tables in table id orders, because the publish should get table write
lock in order to prevent dead lock
2. When publish a transaction with multi tables, may cost a long time.
Because there is only 1 publish thread, this will block the other txns.
Here is how it happen:
* the txn contains 2 tables: t0 and t1, publish get `t0 write lock`, and
wait for `t1 write lock`
* at the same time, client `insert into t0 select from t1` concurrently.
`t1 read lock` is held by nereids planner, and this planner wait for `t0
read lock` until 1 minute timeout.
* the `t1 read lock` is held by multi load threads which continuously
comes from client, so the publish wait a long time for `t1 write lock`.
the fix is that:
* when publish, we get table write lock with a timeout, if failed, the
publish will retry in next loop. Make one txn does not block the other
txns
* for nereids planner, get table read lock in id order to find lock
conflict more promptly
---
.../apache/doris/common/util/MetaLockUtils.java | 13 +++++
.../org/apache/doris/nereids/CascadesContext.java | 5 +-
.../apache/doris/service/FrontendServiceImpl.java | 4 ++
.../doris/transaction/DatabaseTransactionMgr.java | 6 ++-
.../test_group_commit_and_wal_back_pressure.out | 10 ----
.../org/apache/doris/regression/suite/Suite.groovy | 5 +-
.../insert_p0/insert_group_commit_into.groovy | 18 +++----
.../test_group_commit_data_bytes_property.groovy | 22 +++++++-
.../insert_p0/txn_insert_concurrent_insert.groovy | 2 +-
.../test_group_commit_and_wal_back_pressure.groovy | 63 ++++++++--------------
10 files changed, 80 insertions(+), 68 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index 084c6f25f79..16afbcecdae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -74,6 +74,19 @@ public class MetaLockUtils {
return lockedTablesList;
}
+ public static boolean tryWriteLockTablesIfExist(List<? extends TableIf>
tableList, long timeout,
+ TimeUnit unit) {
+ for (int i = 0; i < tableList.size(); i++) {
+ if (!tableList.get(i).tryWriteLockIfExist(timeout, unit)) {
+ for (int j = i - 1; j >= 0; j--) {
+ tableList.get(j).writeUnlock();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
public static void writeLockTablesOrMetaException(List<? extends TableIf>
tableList) throws MetaNotFoundException {
for (int i = 0; i < tableList.size(); i++) {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index a0d748c08c7..5c9d2b7a01a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -261,7 +261,8 @@ public class CascadesContext implements ScheduleContext {
}
public void setTables(List<TableIf> tables) {
- this.tables = tables.stream().collect(Collectors.toMap(TableIf::getId,
t -> t, (t1, t2) -> t1));
+ this.tables = tables.stream()
+ .collect(Collectors.toMap(TableIf::getId, t -> t, (t1, t2) ->
t1, () -> Maps.newTreeMap()));
}
public final ConnectContext getConnectContext() {
@@ -399,7 +400,7 @@ public class CascadesContext implements ScheduleContext {
*/
public void extractTables(LogicalPlan logicalPlan) {
Set<List<String>> tableNames = getTables(logicalPlan);
- tables = Maps.newHashMap();
+ tables = Maps.newTreeMap();
for (List<String> tableName : tableNames) {
try {
TableIf table = getTable(tableName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8f34fed5e5f..130d440fcda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -269,6 +269,7 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -1416,6 +1417,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(tbl,
TableType.OLAP);
tables.add(table);
}
+ if (tables.size() > 1) {
+ tables.sort(Comparator.comparing(Table::getId));
+ }
// if it has multi table, use multi table and update multi table
running
// transaction table ids
if (CollectionUtils.isNotEmpty(request.getTbls())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 6b30ed507f7..5944170cb3d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -91,6 +91,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -1095,7 +1096,10 @@ public class DatabaseTransactionMgr {
LOG.debug("finish transaction {} with tables {}", transactionId,
tableIdList);
}
List<? extends TableIf> tableList =
db.getTablesOnIdOrderIfExist(tableIdList);
- tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
+ if (!MetaLockUtils.tryWriteLockTablesIfExist(tableList, 10,
TimeUnit.SECONDS)) {
+ LOG.warn("finish transaction {} failed, get lock timeout with
tables {}", transactionId, tableIdList);
+ return;
+ }
PublishResult publishResult;
try {
// add all commit errors and publish errors to a single set
diff --git
a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out
b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out
deleted file mode 100644
index 4b064bec44e..00000000000
---
a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out
+++ /dev/null
@@ -1,10 +0,0 @@
--- This file is automatically generated. You should know what you did if you
want to edit this
--- !1 --
-1
-
--- !2 --
-1
-
--- !3 --
-1
-
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 1685b1e4d46..8397a638c36 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1807,11 +1807,12 @@ class Suite implements GroovyInterceptable {
def last_end_version = -1
for(def rowset : rowsets) {
def version_str = rowset.substring(1, rowset.indexOf("]"))
- logger.info("version_str: $version_str")
def versions = version_str.split("-")
def start_version = versions[0].toLong()
def end_version = versions[1].toLong()
- logger.info("cur_version:[$start_version - $end_version],
last_version:[$last_start_version - $last_end_version]")
+ if (last_end_version + 1 != start_version) {
+ logger.warn("last_version:[$last_start_version -
$last_end_version], cur_version:[$start_version - $end_version], version_str:
$version_str")
+ }
assertEquals(last_end_version + 1, start_version)
last_start_version = start_version
last_end_version = end_version
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 6476d2736e9..3b4bf78846f 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -16,7 +16,8 @@
// under the License.
import com.mysql.cj.jdbc.StatementImpl
-import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
suite("insert_group_commit_into") {
def dbName = "regression_test_insert_p0"
@@ -24,16 +25,13 @@ suite("insert_group_commit_into") {
def table = dbName + "." + tableName
def getRowCount = { expectedRowCount ->
- def retry = 0
- while (retry < 30) {
- sleep(2000)
- def rowCount = sql "select count(*) from ${table}"
- logger.info("rowCount: " + rowCount + ", retry: " + retry)
- if (rowCount[0][0] >= expectedRowCount) {
- break
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+ {
+ def result = sql "select count(*) from ${table}"
+ logger.info("table: ${table}, rowCount: ${result}")
+ return result[0][0] == expectedRowCount
}
- retry++
- }
+ )
}
def getAlterTableState = {
diff --git
a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
index d4f98e13d02..fe603c93ce9 100644
---
a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
+++
b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
@@ -18,6 +18,9 @@
import com.mysql.cj.jdbc.StatementImpl
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
suite("test_group_commit_data_bytes_property") {
def dbName = "regression_test_insert_p0"
@@ -88,7 +91,24 @@ suite("test_group_commit_data_bytes_property") {
def msg3 = group_commit_insert """insert into ${test_table}
values(3,3); """, 1
- def msg4 = group_commit_insert """insert into ${test_table}
values(4,4); """, 1
+ // add a retry for can not get a block queue because the data
bytes is too small
+ def msg4 = ""
+ Awaitility.await().atMost(10, SECONDS).until(
+ {
+ try {
+ sql """ set group_commit = async_mode; """
+ msg4 = group_commit_insert """insert into
${test_table} values(4,4); """, 1
+ return true
+ } catch (Exception e) {
+ logger.info("get exception: ${e.getMessage()}")
+ if (e.getMessage().contains("can not get a block
queue")) {
+ return false
+ } else {
+ throw e
+ }
+ }
+ }
+ )
assertNotEquals(msg3.substring(msg3.indexOf("group_commit")+11,
msg3.indexOf("group_commit")+43),
msg4.substring(msg4.indexOf("group_commit")+11,
msg4.indexOf("group_commit")+43));
diff --git
a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
index 3e84a9f56e8..c28f9192258 100644
--- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
@@ -109,7 +109,7 @@ suite("txn_insert_concurrent_insert") {
futures.add(future)
}
CompletableFuture<?>[] futuresArray = futures.toArray(new
CompletableFuture[0])
- CompletableFuture.allOf(futuresArray).get(2, TimeUnit.MINUTES)
+ CompletableFuture.allOf(futuresArray).get(3, TimeUnit.MINUTES)
sql """ sync """
def result = sql """ select count() from ${tableName}_0 """
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
index 5ec3cab11d0..bd443b6eab7 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
@@ -15,38 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_group_commit_and_wal_back_pressure") {
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
- def tableName = "test_group_commit_and_wal_back_pressure"
- sql """ DROP TABLE IF EXISTS ${tableName}1 """
- sql """
- CREATE TABLE ${tableName}1 (
- k bigint,
- v string
- )
- UNIQUE KEY(k)
- DISTRIBUTED BY HASH (k) BUCKETS 32
- PROPERTIES(
- "replication_num" = "1"
- );
- """
+suite("test_group_commit_and_wal_back_pressure") {
- sql """ DROP TABLE IF EXISTS ${tableName}2 """
- sql """
- CREATE TABLE ${tableName}2 (
- k bigint,
- v string
- )
- UNIQUE KEY(k)
- DISTRIBUTED BY HASH (k) BUCKETS 32
- PROPERTIES(
- "replication_num" = "1"
- );
- """
+ def getRowCount = { table, expectedRowCount ->
+ Awaitility.await().atMost(90, SECONDS).pollInterval(2, SECONDS).until(
+ {
+ def result = sql "select count(*) from ${table}"
+ logger.info("table: ${table}, rowCount: ${result}")
+ return result[0][0] == expectedRowCount
+ }
+ )
+ }
- sql """ DROP TABLE IF EXISTS ${tableName}3 """
- sql """
- CREATE TABLE ${tableName}3 (
+ def tableName = "test_group_commit_and_wal_back_pressure"
+ for (int j = 1; j <= 3; j++) {
+ sql """ DROP TABLE IF EXISTS ${tableName}${j} """
+ sql """
+ CREATE TABLE ${tableName}${j} (
k bigint,
v string
)
@@ -55,7 +43,8 @@ suite("test_group_commit_and_wal_back_pressure") {
PROPERTIES(
"replication_num" = "1"
);
- """
+ """
+ }
def t1 = []
for (int i = 0; i < 20; i++) {
@@ -122,16 +111,8 @@ suite("test_group_commit_and_wal_back_pressure") {
for (Thread th in t3) {
th.join()
}
-
- // wait for group commit
- Thread.sleep(10000)
-
sql "sync"
-
- qt_1 """ select count(*) from ${tableName}1;"""
-
- qt_2 """ select count(*) from ${tableName}2;"""
-
- qt_3 """ select count(*) from ${tableName}3;"""
-
+ getRowCount "${tableName}1", 1
+ getRowCount "${tableName}2", 1
+ getRowCount "${tableName}3", 1
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]