This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e8690b62ee8 [fix](group commit) Pick add debug log show why group
commit not work; delete wal when replay success (#38611) (#38659)
e8690b62ee8 is described below
commit e8690b62ee85c2cb5cdf15bf02dd439e91088bff
Author: meiyi <[email protected]>
AuthorDate: Thu Aug 1 16:59:54 2024 +0800
[fix](group commit) Pick add debug log show why group commit not work;
delete wal when replay success (#38611) (#38659)
Pick https://github.com/apache/doris/pull/38611
---
be/src/olap/wal/wal_table.cpp | 15 +++---
be/src/runtime/group_commit_mgr.cpp | 2 +
.../apache/doris/analysis/NativeInsertStmt.java | 63 ++++++++++++++++++----
...oup_commit_async_wal_msg_fault_injection.groovy | 41 +++++++++++++-
.../insert_p0/insert_group_commit_into.groovy | 8 +++
5 files changed, 108 insertions(+), 21 deletions(-)
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index ec0c412379a..38c262e9889 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -86,7 +86,6 @@ void WalTable::_pick_relay_wals() {
Status WalTable::_relay_wal_one_by_one() {
std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
- std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
for (auto wal_info : _replaying_queue) {
wal_info->add_retry_num();
auto st = _replay_wal_internal(wal_info->get_wal_path());
@@ -96,7 +95,12 @@ Status WalTable::_relay_wal_one_by_one() {
msg.find("LabelAlreadyUsedException") != msg.npos) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
- need_delete_wals.push_back(wal_info);
+ // delete wal
+ WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id,
wal_info->get_wal_id()),
+ "failed to delete wal=" + wal_info->get_wal_path());
+ if (config::group_commit_wait_replay_wal_finish) {
+
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()));
+ }
} else {
doris::wal_fail << 1;
LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
@@ -111,13 +115,6 @@ Status WalTable::_relay_wal_one_by_one() {
_replay_wal_map.emplace(retry_wal_info->get_wal_path(),
retry_wal_info);
}
}
- for (auto delete_wal_info : need_delete_wals) {
- [[maybe_unused]] auto st =
- _exec_env->wal_mgr()->delete_wal(_table_id,
delete_wal_info->get_wal_id());
- if (config::group_commit_wait_replay_wal_finish) {
-
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(delete_wal_info->get_wal_id()));
- }
- }
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 7bb30b1cc8b..b6b4c5d646d 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -398,6 +398,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{ status = Status::InternalError(""); });
if (status.ok()) {
+
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error",
+ { status = Status::InternalError(""); });
// commit txn
TLoadTxnCommitRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index b26693134b0..ac666020a02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -83,6 +83,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -1200,28 +1202,53 @@ public class NativeInsertStmt extends InsertStmt {
LOG.warn("analyze group commit failed", e);
return;
}
- boolean partialUpdate =
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
- if (!isExplain() && !partialUpdate &&
ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
- && ConnectContext.get().getSessionVariable().getSqlMode() !=
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
- && targetTable instanceof OlapTable
- && ((OlapTable)
targetTable).getTableProperty().getUseSchemaLightChange()
- &&
!targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
- && !ConnectContext.get().isTxnModel()
- && getQueryStmt() instanceof SelectStmt
- && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() &&
targetPartitionNames == null
- && (label == null ||
Strings.isNullOrEmpty(label.getLabelName()))
- && (analyzer == null || analyzer != null &&
!analyzer.isReAnalyze())) {
+ ConnectContext ctx = ConnectContext.get();
+ List<Pair<BooleanSupplier, Supplier<String>>> conditions = new
ArrayList<>();
+ conditions.add(Pair.of(() ->
ctx.getSessionVariable().isEnableInsertGroupCommit(),
+ () -> "group_commit session variable: " +
ctx.getSessionVariable().groupCommit));
+ conditions.add(Pair.of(() -> !isExplain(), () -> "isExplain"));
+ conditions.add(Pair.of(() ->
!ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate(),
+ () -> "enableUniqueKeyPartialUpdate"));
+ conditions.add(Pair.of(() -> !ctx.isTxnModel(), () -> "isTxnModel"));
+ conditions.add(Pair.of(() -> targetTable instanceof OlapTable,
+ () -> "not olapTable, class: " +
targetTable.getClass().getName()));
+ conditions.add(Pair.of(() -> ((OlapTable)
targetTable).getTableProperty().getUseSchemaLightChange(),
+ () -> "notUseSchemaLightChange"));
+ conditions.add(Pair.of(() ->
!targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME),
+ () -> "db is internal"));
+ conditions.add(
+ Pair.of(() -> targetPartitionNames == null, () ->
"targetPartitionNames: " + targetPartitionNames));
+ conditions.add(Pair.of(() -> ctx.getSessionVariable().getSqlMode() !=
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES,
+ () -> "sqlMode: " + ctx.getSessionVariable().getSqlMode()));
+ conditions.add(Pair.of(() -> queryStmt instanceof SelectStmt,
+ () -> "queryStmt is not SelectStmt, class: " +
queryStmt.getClass().getName()));
+ conditions.add(Pair.of(() -> ((SelectStmt)
queryStmt).getTableRefs().isEmpty(),
+ () -> "tableRefs is not empty: " + ((SelectStmt)
queryStmt).getTableRefs()));
+ conditions.add(
+ Pair.of(() -> (label == null ||
Strings.isNullOrEmpty(label.getLabelName())), () -> "label: " + label));
+ conditions.add(
+ Pair.of(() -> (analyzer == null || analyzer != null &&
!analyzer.isReAnalyze()), () -> "analyzer"));
+ boolean match = conditions.stream().allMatch(p ->
p.first.getAsBoolean());
+ if (match) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
for (Expr expr : row) {
if (!(expr instanceof LiteralExpr)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("group commit is off for table: {},
because not literal expr, "
+ + "expr: {}, row: {}",
targetTable.getName(), expr, row);
+ }
return;
}
}
}
// Does not support: insert into tbl values();
if (selectStmt.getValueList().getFirstRow().isEmpty() &&
CollectionUtils.isEmpty(targetColumnNames)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("group commit is off for table: {}, because
first row: {}, target columns: {}",
+ targetTable.getName(),
selectStmt.getValueList().getFirstRow(), targetColumnNames);
+ }
return;
}
} else {
@@ -1231,6 +1258,10 @@ public class NativeInsertStmt extends InsertStmt {
if (items != null) {
for (SelectListItem item : items) {
if (item.getExpr() != null && !(item.getExpr()
instanceof LiteralExpr)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("group commit is off for table:
{}, because not literal expr, "
+ + "expr: {}, row: {}",
targetTable.getName(), item.getExpr(), item);
+ }
return;
}
}
@@ -1238,6 +1269,16 @@ public class NativeInsertStmt extends InsertStmt {
}
}
isGroupCommit = true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ for (Pair<BooleanSupplier, Supplier<String>> pair :
conditions) {
+ if (pair.first.getAsBoolean() == false) {
+ LOG.debug("group commit is off for table: {}, because:
{}", targetTable.getName(),
+ pair.second.get());
+ break;
+ }
+ }
+ }
}
}
diff --git
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
index c2523c49092..bb064305130 100644
---
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -15,11 +15,24 @@
// specific language governing permissions and limitations
// under the License.
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
def tableName = "wal_test"
-
+
+ def getRowCount = { expectedRowCount ->
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+ {
+ def result = sql "select count(*) from ${tableName}"
+ logger.info("table: ${tableName}, rowCount: ${result}")
+ return result[0][0] == expectedRowCount
+ }
+ )
+ }
+
// test successful group commit async load
sql """ DROP TABLE IF EXISTS ${tableName} """
@@ -132,4 +145,30 @@
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
assertTrue(exception)
}
+ // test replay wal should success
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k`)
+ BUCKETS 5
+ properties("replication_num" = "1", "group_commit_interval_ms" =
"4000")
+ """
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error")
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'group_commit', 'async_mode'
+ unset 'label'
+ file 'group_commit_wal_msg.csv'
+ time 10000
+ }
+ getRowCount(5)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error")
+ }
}
\ No newline at end of file
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 371e392fcc3..b06e7f4c89a 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -341,6 +341,14 @@ suite("insert_group_commit_into") {
getRowCount(1)
qt_sql """ select * from ${table}; """
+
+ sql " set enable_unique_key_partial_update=true "
+ none_group_commit_insert """
+ INSERT INTO ${table} (`data_binary`, `end_time`,
`endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`,
`service_id`, `service_instance_id`, `start_time`, `statement`, `tags`,
`teamID`, `time_bucket`, `trace_id`)
+ VALUES
+
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdG
[...]
+ 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411',
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3,
'3229b7cd-f3a2-4359-aa24-946388c9cc54',
'service_46da0dab-e27d-4820-aea2-9bfc15741615',
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304,
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5,
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16,
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_ [...]
+ """, 1
}
} finally {
// try_sql("DROP TABLE ${table}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]