This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e8690b62ee8 [fix](group commit) Pick add debug log show why group 
commit not work; delete wal when replay success (#38611) (#38659)
e8690b62ee8 is described below

commit e8690b62ee85c2cb5cdf15bf02dd439e91088bff
Author: meiyi <[email protected]>
AuthorDate: Thu Aug 1 16:59:54 2024 +0800

    [fix](group commit) Pick add debug log show why group commit not work; 
delete wal when replay success (#38611) (#38659)
    
    Pick https://github.com/apache/doris/pull/38611
---
 be/src/olap/wal/wal_table.cpp                      | 15 +++---
 be/src/runtime/group_commit_mgr.cpp                |  2 +
 .../apache/doris/analysis/NativeInsertStmt.java    | 63 ++++++++++++++++++----
 ...oup_commit_async_wal_msg_fault_injection.groovy | 41 +++++++++++++-
 .../insert_p0/insert_group_commit_into.groovy      |  8 +++
 5 files changed, 108 insertions(+), 21 deletions(-)

diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index ec0c412379a..38c262e9889 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -86,7 +86,6 @@ void WalTable::_pick_relay_wals() {
 
 Status WalTable::_relay_wal_one_by_one() {
     std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
-    std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
     for (auto wal_info : _replaying_queue) {
         wal_info->add_retry_num();
         auto st = _replay_wal_internal(wal_info->get_wal_path());
@@ -96,7 +95,12 @@ Status WalTable::_relay_wal_one_by_one() {
             msg.find("LabelAlreadyUsedException") != msg.npos) {
             LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
                       << ", st=" << st.to_string();
-            need_delete_wals.push_back(wal_info);
+            // delete wal
+            WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id, 
wal_info->get_wal_id()),
+                          "failed to delete wal=" + wal_info->get_wal_path());
+            if (config::group_commit_wait_replay_wal_finish) {
+                
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()));
+            }
         } else {
             doris::wal_fail << 1;
             LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
@@ -111,13 +115,6 @@ Status WalTable::_relay_wal_one_by_one() {
             _replay_wal_map.emplace(retry_wal_info->get_wal_path(), 
retry_wal_info);
         }
     }
-    for (auto delete_wal_info : need_delete_wals) {
-        [[maybe_unused]] auto st =
-                _exec_env->wal_mgr()->delete_wal(_table_id, 
delete_wal_info->get_wal_id());
-        if (config::group_commit_wait_replay_wal_finish) {
-            
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(delete_wal_info->get_wal_id()));
-        }
-    }
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 7bb30b1cc8b..b6b4c5d646d 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -398,6 +398,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
                     { status = Status::InternalError(""); });
     if (status.ok()) {
+        
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error",
+                        { status = Status::InternalError(""); });
         // commit txn
         TLoadTxnCommitRequest request;
         request.__set_auth_code(0); // this is a fake, fe not check it now
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index b26693134b0..ac666020a02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -83,6 +83,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -1200,28 +1202,53 @@ public class NativeInsertStmt extends InsertStmt {
             LOG.warn("analyze group commit failed", e);
             return;
         }
-        boolean partialUpdate = 
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
-        if (!isExplain() && !partialUpdate && 
ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
-                && ConnectContext.get().getSessionVariable().getSqlMode() != 
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
-                && targetTable instanceof OlapTable
-                && ((OlapTable) 
targetTable).getTableProperty().getUseSchemaLightChange()
-                && 
!targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
-                && !ConnectContext.get().isTxnModel()
-                && getQueryStmt() instanceof SelectStmt
-                && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && 
targetPartitionNames == null
-                && (label == null || 
Strings.isNullOrEmpty(label.getLabelName()))
-                && (analyzer == null || analyzer != null && 
!analyzer.isReAnalyze())) {
+        ConnectContext ctx = ConnectContext.get();
+        List<Pair<BooleanSupplier, Supplier<String>>> conditions = new 
ArrayList<>();
+        conditions.add(Pair.of(() -> 
ctx.getSessionVariable().isEnableInsertGroupCommit(),
+                () -> "group_commit session variable: " + 
ctx.getSessionVariable().groupCommit));
+        conditions.add(Pair.of(() -> !isExplain(), () -> "isExplain"));
+        conditions.add(Pair.of(() -> 
!ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate(),
+                () -> "enableUniqueKeyPartialUpdate"));
+        conditions.add(Pair.of(() -> !ctx.isTxnModel(), () -> "isTxnModel"));
+        conditions.add(Pair.of(() -> targetTable instanceof OlapTable,
+                () -> "not olapTable, class: " + 
targetTable.getClass().getName()));
+        conditions.add(Pair.of(() -> ((OlapTable) 
targetTable).getTableProperty().getUseSchemaLightChange(),
+                () -> "notUseSchemaLightChange"));
+        conditions.add(Pair.of(() -> 
!targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME),
+                () -> "db is internal"));
+        conditions.add(
+                Pair.of(() -> targetPartitionNames == null, () -> 
"targetPartitionNames: " + targetPartitionNames));
+        conditions.add(Pair.of(() -> ctx.getSessionVariable().getSqlMode() != 
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES,
+                () -> "sqlMode: " + ctx.getSessionVariable().getSqlMode()));
+        conditions.add(Pair.of(() -> queryStmt instanceof SelectStmt,
+                () -> "queryStmt is not SelectStmt, class: " + 
queryStmt.getClass().getName()));
+        conditions.add(Pair.of(() -> ((SelectStmt) 
queryStmt).getTableRefs().isEmpty(),
+                () -> "tableRefs is not empty: " + ((SelectStmt) 
queryStmt).getTableRefs()));
+        conditions.add(
+                Pair.of(() -> (label == null || 
Strings.isNullOrEmpty(label.getLabelName())), () -> "label: " + label));
+        conditions.add(
+                Pair.of(() -> (analyzer == null || analyzer != null && 
!analyzer.isReAnalyze()), () -> "analyzer"));
+        boolean match = conditions.stream().allMatch(p -> 
p.first.getAsBoolean());
+        if (match) {
             SelectStmt selectStmt = (SelectStmt) queryStmt;
             if (selectStmt.getValueList() != null) {
                 for (List<Expr> row : selectStmt.getValueList().getRows()) {
                     for (Expr expr : row) {
                         if (!(expr instanceof LiteralExpr)) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("group commit is off for table: {}, 
because not literal expr, "
+                                        + "expr: {}, row: {}", 
targetTable.getName(), expr, row);
+                            }
                             return;
                         }
                     }
                 }
                 // Does not support: insert into tbl values();
                 if (selectStmt.getValueList().getFirstRow().isEmpty() && 
CollectionUtils.isEmpty(targetColumnNames)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("group commit is off for table: {}, because 
first row: {}, target columns: {}",
+                                targetTable.getName(), 
selectStmt.getValueList().getFirstRow(), targetColumnNames);
+                    }
                     return;
                 }
             } else {
@@ -1231,6 +1258,10 @@ public class NativeInsertStmt extends InsertStmt {
                     if (items != null) {
                         for (SelectListItem item : items) {
                             if (item.getExpr() != null && !(item.getExpr() 
instanceof LiteralExpr)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("group commit is off for table: 
{}, because not literal expr, "
+                                            + "expr: {}, row: {}", 
targetTable.getName(), item.getExpr(), item);
+                                }
                                 return;
                             }
                         }
@@ -1238,6 +1269,16 @@ public class NativeInsertStmt extends InsertStmt {
                 }
             }
             isGroupCommit = true;
+        } else {
+            if (LOG.isDebugEnabled()) {
+                for (Pair<BooleanSupplier, Supplier<String>> pair : 
conditions) {
+                    if (pair.first.getAsBoolean() == false) {
+                        LOG.debug("group commit is off for table: {}, because: 
{}", targetTable.getName(),
+                                pair.second.get());
+                        break;
+                    }
+                }
+            }
         }
     }
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
index c2523c49092..bb064305130 100644
--- 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -15,11 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
 suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
 
 
     def tableName = "wal_test"
- 
+
+    def getRowCount = { expectedRowCount ->
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+            {
+                def result = sql "select count(*) from ${tableName}"
+                logger.info("table: ${tableName}, rowCount: ${result}")
+                return result[0][0] == expectedRowCount
+            }
+        )
+    }
+
     // test successful group commit async load
     sql """ DROP TABLE IF EXISTS ${tableName} """
 
@@ -132,4 +145,30 @@ 
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
             assertTrue(exception)
         }
 
+    // test replay wal should success
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1", "group_commit_interval_ms" = 
"4000")
+    """
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error")
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            set 'group_commit', 'async_mode'
+            unset 'label'
+            file 'group_commit_wal_msg.csv'
+            time 10000
+        }
+        getRowCount(5)
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error")
+    }
 }
\ No newline at end of file
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 371e392fcc3..b06e7f4c89a 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -341,6 +341,14 @@ suite("insert_group_commit_into") {
 
                 getRowCount(1)
                 qt_sql """ select * from ${table}; """
+
+                sql " set enable_unique_key_partial_update=true "
+                none_group_commit_insert """ 
+                INSERT INTO ${table} (`data_binary`, `end_time`, 
`endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`, 
`service_id`, `service_instance_id`, `start_time`, `statement`, `tags`, 
`teamID`, `time_bucket`, `trace_id`) 
+                VALUES 
+                
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdG
 [...]
+                1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', 
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, 
'3229b7cd-f3a2-4359-aa24-946388c9cc54', 
'service_46da0dab-e27d-4820-aea2-9bfc15741615', 
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, 
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, 
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_ [...]
+                """, 1
             }
         } finally {
             // try_sql("DROP TABLE ${table}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to