This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3ae91298de7 [Fix](merge-on-write) AbstractInsertExecutor should throw
exception after running out of retry times (#40436)
3ae91298de7 is described below
commit 3ae91298de771e4eeb2382b402a34588c3180ff1
Author: bobhan1 <[email protected]>
AuthorDate: Tue Sep 10 14:38:38 2024 +0800
[Fix](merge-on-write) AbstractInsertExecutor should throw exception after
running out of retry times (#40436)
## Proposed changes
For cloud mow table, `AbstractInsertExecutor` will retry for insert
stmt. But it forget to throw exception after running out of retry times,
resulting in returnning OK status to user with the possibility that the
insert stmt is in fact not executed successfully.
---
.../commands/insert/AbstractInsertExecutor.java | 11 ++-
.../insert/BaseExternalTableInsertExecutor.java | 1 +
.../plans/commands/insert/OlapInsertExecutor.java | 1 +
.../cloud/test_cloud_mow_insert_timeout.out | 11 +++
.../cloud/test_cloud_mow_insert_timeout.groovy | 88 ++++++++++++++++++++++
5 files changed, 110 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index defcd6c6e99..cdf74f5e9ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -94,6 +94,8 @@ public abstract class AbstractInsertExecutor {
return labelName;
}
+ public abstract long getTxnId();
+
/**
* begin transaction if necessary
*/
@@ -192,14 +194,19 @@ public abstract class AbstractInsertExecutor {
execImpl(executor, jobId);
checkStrictModeAndFilterRatio();
int retryTimes = 0;
- while (retryTimes < Config.mow_insert_into_commit_retry_times) {
+ while (true) {
try {
onComplete();
break;
} catch (UserException e) {
- LOG.warn("failed to commit txn", e);
+ LOG.warn("failed to commit txn, txnId={}, jobId={},
retryTimes={}",
+ getTxnId(), jobId, retryTimes, e);
if (e.getErrorCode() ==
InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
retryTimes++;
+ if (retryTimes >=
Config.mow_insert_into_commit_retry_times) {
+ // should throw exception after running out of
retry times
+ throw e;
+ }
} else {
throw e;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index 1c22b9bf56a..a3aa33f96ab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -70,6 +70,7 @@ public abstract class BaseExternalTableInsertExecutor extends
AbstractInsertExec
}
}
+ @Override
public long getTxnId() {
return txnId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 1262829aa48..b57ac383495 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -85,6 +85,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
+ @Override
public long getTxnId() {
return txnId;
}
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.out
new file mode 100644
index 00000000000..ee71e1e449d
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1
+2 2 2
+3 3 3
+
+-- !sql --
+1 1 1
+2 2 2
+3 3 3
+
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
new file mode 100644
index 00000000000..23d92f31e5a
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_mow_insert_timeout", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def table1 = "test_cloud_mow_insert_timeout"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_mow_light_delete" = "false",
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1);"
+ sql "insert into ${table1} values(2,2,2);"
+ sql "insert into ${table1} values(3,3,3);"
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+ def customFeConfig = [
+ delete_bitmap_lock_expiration_seconds : 5,
+ calculate_delete_bitmap_task_timeout_seconds : 2,
+ mow_insert_into_commit_retry_times : 2
+ ]
+
+ setFeConfigTemporary(customFeConfig) {
+ try {
+ explain {
+ sql "delete from ${table1} where k1=2;"
+ contains "IS_PARTIAL_UPDATE: true"
+ }
+
+ // block the calculation of delete bitmap on BE
+
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait",
[token: "token1"])
+
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block",
[wait_token: "token1"])
+
+ // should return error after running out of try times
+ test {
+ sql "delete from ${table1} where k1=2;"
+ exception "Failed to calculate delete bitmap. Timeout."
+ }
+
+ test {
+ sql "insert into ${table1} values(4,4,4)"
+ exception "Failed to calculate delete bitmap. Timeout."
+ }
+
+ order_qt_sql "select * from ${table1};"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ sql "DROP TABLE IF EXISTS ${table1};"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]