This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 12584baa650 branch-3.0: [fix](txn load) fix delete in txn load
(#52133) (#52635)
12584baa650 is described below
commit 12584baa650d9753d965801a0b95bbbbef82b61a
Author: meiyi <[email protected]>
AuthorDate: Mon Jul 7 11:02:15 2025 +0800
branch-3.0: [fix](txn load) fix delete in txn load (#52133) (#52635)
pick https://github.com/apache/doris/pull/52133
---
.../nereids/rules/rewrite/PruneEmptyPartition.java | 31 ++++++++++++++++++++++
.../apache/doris/transaction/TransactionEntry.java | 26 ++++++++++++++++++
.../suites/insert_p0/transaction/txn_insert.groovy | 22 +++++++++++++++
3 files changed, 79 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java
index c7b8f452afb..3347518b165 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java
@@ -18,11 +18,16 @@
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.transaction.TransactionEntry;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.List;
@@ -30,6 +35,7 @@ import java.util.List;
* Used to prune empty partition.
*/
public class PruneEmptyPartition extends OneRewriteRuleFactory {
+ public static final Logger LOG =
LogManager.getLogger(PruneEmptyPartition.class);
@Override
public Rule build() {
@@ -38,6 +44,11 @@ public class PruneEmptyPartition extends
OneRewriteRuleFactory {
OlapTable table = scan.getTable();
List<Long> partitionIdsToPrune = scan.getSelectedPartitionIds();
List<Long> ids =
table.selectNonEmptyPartitionIds(partitionIdsToPrune);
+ if (ctx.connectContext != null && ctx.connectContext.isTxnModel())
{
+ // In transaction load, need to add empty partitions which
have invisible data of sub transactions
+
selectNonEmptyPartitionIdsForTxnLoad(ctx.connectContext.getTxnEntry(), table,
scan.getSelectedIndexId(),
+ partitionIdsToPrune, ids);
+ }
if (ids.isEmpty()) {
return new
LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(),
scan.getOutput());
@@ -49,4 +60,24 @@ public class PruneEmptyPartition extends
OneRewriteRuleFactory {
return scan.withSelectedPartitionIds(ids);
}).toRule(RuleType.PRUNE_EMPTY_PARTITION);
}
+
+ private void selectNonEmptyPartitionIdsForTxnLoad(TransactionEntry
txnEntry, OlapTable table, long indexId,
+ List<Long> selectedPartitions, List<Long> nonEmptyPartitionIds) {
+ for (Long selectedPartitionId : selectedPartitions) {
+ if (nonEmptyPartitionIds.contains(selectedPartitionId)) {
+ continue;
+ }
+ Partition partition = table.getPartition(selectedPartitionId);
+ if (partition == null) {
+ continue;
+ }
+ if (!txnEntry.getPartitionSubTxnIds(table.getId(), partition,
indexId).isEmpty()) {
+ nonEmptyPartitionIds.add(selectedPartitionId);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add partition for txn load, table: {}, selected
partitions: {}, non empty partitions: {}",
+ table.getId(), selectedPartitions, nonEmptyPartitionIds);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 25c4ff4b3b2..70c281201b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -22,7 +22,9 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
@@ -501,6 +503,30 @@ public class TransactionEntry {
+ "subTxnStates={}", label, transactionId, dbId,
timeoutTimestamp, allSubTxnNum, subTransactionStates);
}
+ public List<Long> getPartitionSubTxnIds(long tableId, Partition partition,
long indexId) {
+ List<Long> subTxnIds = new ArrayList<>();
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index == null) {
+ LOG.error("index={} not found in table={}, partition={}", indexId,
tableId, partition.getId());
+ return subTxnIds;
+ }
+ for (SubTransactionState subTransactionState : subTransactionStates) {
+ if (subTransactionState.getTable().getId() != tableId) {
+ continue;
+ }
+ for (TTabletCommitInfo tabletCommitInfo :
subTransactionState.getTabletCommitInfos()) {
+ if (index.getTablet(tabletCommitInfo.getTabletId()) != null) {
+ subTxnIds.add(subTransactionState.getSubTransactionId());
+ break;
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("table_id={}, partition_id={}, sub_txn_ids={}", tableId,
partition.getId(), subTxnIds);
+ }
+ return subTxnIds;
+ }
+
private void resetByTxnInfo(TTxnLoadInfo txnLoadInfo) throws DdlException {
if (txnLoadInfo.isSetDbId()) {
this.dbId = txnLoadInfo.getDbId();
diff --git a/regression-test/suites/insert_p0/transaction/txn_insert.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
index 41b8a6d35dd..319a20fdaba 100644
--- a/regression-test/suites/insert_p0/transaction/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
@@ -793,6 +793,28 @@ suite("txn_insert") {
order_qt_select_cu2 """select * from ${unique_table}_2"""
order_qt_select_cu3 """select * from ${unique_table}_3"""
}
+
+ // 19. delete from empty table
+ sql """ drop table if exists txn_insert_dt6; """
+ sql """
+ CREATE TABLE `txn_insert_dt6` (
+ `ID` int NOT NULL,
+ `NAME` varchar(100) NULL,
+ `SCORE` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`ID`)
+ DISTRIBUTED BY HASH(`ID`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """ begin; """
+ sql """ INSERT INTO txn_insert_dt6 select 1, 'Alice', 100; """
+ test {
+ sql """ delete from txn_insert_dt6 where id = 1; """
+ exception """Can not delete because there is a insert operation
for the same table"""
+ }
+ sql """ rollback; """
}
def db_name = "regression_test_insert_p0_transaction"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]