This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6bec20afd6c branch-3.1: [enhance](transaction) add completeness of 
commit tablet info check in cloud mode #53979 (#54305)
6bec20afd6c is described below

commit 6bec20afd6cf6c2b176dcc84b23454c03e28ec3a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Aug 5 20:18:12 2025 +0800

    branch-3.1: [enhance](transaction) add completeness of commit tablet info 
check in cloud mode #53979 (#54305)
    
    Cherry-picked from #53979
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/vec/sink/writer/vtablet_writer.cpp          | 10 ++++
 .../transaction/CloudGlobalTransactionMgr.java     | 70 ++++++++++++++++++++++
 .../test_incomplete_commit_info.groovy             | 63 +++++++++++++++++++
 3 files changed, 143 insertions(+)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index f6d8b844b18..fdb75a31d87 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1014,7 +1014,17 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
         if (!st.ok()) {
             _cancel_with_msg(st.to_string());
         } else if (ctx._is_last_rpc) {
+            bool skip_tablet_info = false;
+            
DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info",
+                            { skip_tablet_info = true; });
             for (const auto& tablet : result.tablet_vec()) {
+                
DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info",
 {
+                    if (skip_tablet_info) {
+                        LOG(INFO) << "skip tablet info: " << 
tablet.tablet_id();
+                        skip_tablet_info = false;
+                        continue;
+                    }
+                });
                 TTabletCommitInfo commit_info;
                 commit_info.tabletId = tablet.tablet_id();
                 commit_info.backendId = _node_id;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index b7443dbaab4..2abbc072b9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -72,6 +72,7 @@ import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
@@ -687,6 +688,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC)
             throws UserException {
+        checkCommitInfo(commitTxnRequest);
+
         CommitTxnResponse commitTxnResponse = null;
         TransactionState txnState = null;
         int retryTime = 0;
@@ -748,6 +751,73 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         return txnState;
     }
 
+    private void checkCommitInfo(CommitTxnRequest commitTxnRequest) throws 
UserException {
+        List<Long> commitTabletIds = Lists.newArrayList();
+        List<Long> commitIndexIds = Lists.newArrayList();
+        commitTabletIds.addAll(commitTxnRequest.getBaseTabletIdsList());
+        for (SubTxnInfo subTxnInfo : commitTxnRequest.getSubTxnInfosList()) {
+            commitTabletIds.addAll(subTxnInfo.getBaseTabletIdsList());
+        }
+        if (commitTabletIds.isEmpty()) {
+            return;
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = 
Env.getCurrentEnv().getTabletInvertedIndex();
+        List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(commitTabletIds);
+        Map<OlapTable, Set<Long>> tableToPartition = Maps.newHashMap();
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == null) {
+                continue;
+            }
+            long tableId = tabletMeta.getTableId();
+            long dbId = tabletMeta.getDbId();
+            Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                // this can happen when dbId == -1 (tablet being dropping) or 
db really not exist.
+                continue;
+            }
+            OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping) 
or table really not exist.
+                continue;
+            }
+            // check relative partition restore here
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being 
dropping) or partition really not exist.
+                continue;
+            }
+            tableToPartition.computeIfAbsent(tbl, k -> 
Sets.newHashSet()).add(partitionId);
+            commitIndexIds.add(tabletMeta.getIndexId());
+        }
+
+        for (OlapTable tbl : tableToPartition.keySet()) {
+            for (Partition partition : tbl.getAllPartitions()) {
+                if (!tableToPartition.get(tbl).contains(partition.getId())) {
+                    continue;
+                }
+                List<MaterializedIndex> allIndices
+                            = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                for (MaterializedIndex index : allIndices) {
+                    // Schema change during load will increase partition index 
number,
+                    // and we need to skip these indexes.
+                    // TODO: judge by transactionState.getLoadedTblIndexes() 
is better
+                    if (!commitIndexIds.contains(index.getId())) {
+                        continue;
+                    }
+                    for (Tablet tablet : index.getTablets()) {
+                        if (!commitTabletIds.contains(tablet.getId())) {
+                            throw new LoadException("Table [" + tbl.getName() 
+ "], Index ["
+                                    + index.getId() + "], Partition [" + 
partition.getName()
+                                    + "], tablet " + tablet.getId() + " should 
be committed");
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     // return mow tables with contains tablet commit info
     private List<OlapTable> getMowTableList(List<Table> tableList, 
List<TabletCommitInfo> tabletCommitInfos) {
         if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
diff --git 
a/regression-test/suites/fault_injection_p0/test_incomplete_commit_info.groovy 
b/regression-test/suites/fault_injection_p0/test_incomplete_commit_info.groovy
new file mode 100644
index 00000000000..2b6503a929e
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_incomplete_commit_info.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_incomplete_commit_info", "nonConcurrent") {
+    try {
+        def tableName = "test_incomplete_commit_info"
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+            """
+        
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.add_block_success_callback.incomplete_commit_info")
+        streamLoad {
+            table "${tableName}"
+            db "regression_test_fault_injection_p0"
+            set 'column_separator', ','
+            file "baseall.txt"
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+            }
+        }
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.add_block_success_callback.incomplete_commit_info")
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to