This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 01490139ef6 [improve](txn insert) txn insert support delete command
(#33100)
01490139ef6 is described below
commit 01490139ef6490778129fe1892f8c3cfbc159074
Author: meiyi <[email protected]>
AuthorDate: Thu Apr 4 21:04:12 2024 +0800
[improve](txn insert) txn insert support delete command (#33100)
---
.../main/java/org/apache/doris/load/DeleteJob.java | 19 ++--
.../java/org/apache/doris/load/TxnDeleteJob.java | 67 ++++++++++++++
.../plans/commands/DeleteFromUsingCommand.java | 1 +
.../trees/plans/commands/UpdateCommand.java | 1 +
.../java/org/apache/doris/qe/StmtExecutor.java | 8 +-
regression-test/data/insert_p0/txn_insert.out | 19 ++++
regression-test/suites/insert_p0/txn_insert.groovy | 100 +++++++++++++++++++++
7 files changed, 207 insertions(+), 8 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 0e515ab9e52..e62e5dd9f71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -97,8 +97,8 @@ public class DeleteJob extends AbstractTxnStateChangeCallback
implements DeleteJ
// jobId(listenerId). use in beginTransaction to callback function
private final long id;
- private long transactionId;
- private final String label;
+ protected long transactionId;
+ protected String label;
private final Set<Long> totalTablets;
private final Set<Long> quorumTablets;
private final Set<Long> finishedTablets;
@@ -110,7 +110,7 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
private Database targetDb;
- private OlapTable targetTbl;
+ protected OlapTable targetTbl;
private List<Partition> partitions;
@@ -413,8 +413,7 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
}
}
- @Override
- public String commit() throws Exception {
+ protected List<TabletCommitInfo> generateTabletCommitInfos() {
TabletInvertedIndex currentInvertedIndex =
Env.getCurrentInvertedIndex();
List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList();
tabletDeleteInfoMap.forEach((tabletId, deleteInfo) ->
deleteInfo.getFinishedReplicas()
@@ -425,6 +424,12 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
}
tabletCommitInfos.add(new TabletCommitInfo(tabletId,
replica.getBackendId()));
}));
+ return tabletCommitInfos;
+ }
+
+ @Override
+ public String commit() throws Exception {
+ List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos();
boolean visible = Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(targetDb,
Lists.newArrayList(targetTbl),
transactionId, tabletCommitInfos, getTimeoutMs());
@@ -534,7 +539,9 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(),
params.getTable().getId(),
params.getTable().getName(),
getDeleteCondString(params.getDeleteConditions()),
noPartitionSpecified, partitionIds, partitionNames);
- DeleteJob deleteJob = new DeleteJob(jobId, -1, label,
partitionReplicaNum, deleteInfo);
+ DeleteJob deleteJob = ConnectContext.get() != null &&
ConnectContext.get().isTxnModel()
+ ? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum,
deleteInfo)
+ : new DeleteJob(jobId, -1, label, partitionReplicaNum,
deleteInfo);
long replicaNum =
partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
deleteJob.setPartitions(partitions);
deleteJob.setDeleteConditions(params.getDeleteConditions());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java
new file mode 100644
index 00000000000..79a4bdffa01
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TTabletCommitInfo;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionEntry;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TxnDeleteJob extends DeleteJob {
+ private static final Logger LOG = LogManager.getLogger(TxnDeleteJob.class);
+
+ public TxnDeleteJob(long id, long transactionId, String label, Map<Long,
Short> partitionReplicaNum,
+ DeleteInfo deleteInfo) {
+ super(id, transactionId, label, partitionReplicaNum, deleteInfo);
+ }
+
+ @Override
+ public long beginTxn() throws Exception {
+ TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
+ txnEntry.beginTransaction(targetTbl.getDatabase(), targetTbl);
+ this.transactionId = txnEntry.getTransactionId();
+ this.label = txnEntry.getLabel();
+ return this.transactionId;
+ }
+
+ @Override
+ public String commit() throws Exception {
+ List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos();
+ TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
+ txnEntry.addCommitInfos(targetTbl,
+ tabletCommitInfos.stream().map(c -> new
TTabletCommitInfo(c.getTabletId(), c.getBackendId()))
+ .collect(Collectors.toList()));
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("{'label':'").append(label).append("',
'txnId':'").append(transactionId);
+ sb.append("',
'status':'").append(TransactionStatus.PREPARE.name()).append("'").append("}");
+ return sb.toString();
+ }
+
+ @Override
+ public void cancel(String reason) {
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
index ff70c75558d..cbd0c8ff6e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
@@ -77,6 +77,7 @@ public class DeleteFromUsingCommand extends Command
implements ForwardWithSync,
+ " Please check the following session variables: "
+ String.join(", ", SessionVariable.DEBUG_VARIABLES));
}
+ // NOTE: delete from using command is executed as insert command, so
txn insert can support it
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery),
Optional.empty(), Optional.empty()).run(ctx,
executor);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index 76143c0e80f..4cf6b832e12 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -94,6 +94,7 @@ public class UpdateCommand extends Command implements
ForwardWithSync, Explainab
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ // NOTE: update command is executed as insert command, so txn insert
can support it
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery),
Optional.empty(), Optional.empty()).run(ctx,
executor);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 4b0dfd01469..930ebf943cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -138,6 +138,8 @@ import org.apache.doris.nereids.parser.NereidsParser;
import
org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
+import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@@ -653,8 +655,10 @@ public class StmtExecutor {
// when we in transaction mode, we only support insert into command
and transaction command
if (context.isTxnModel()) {
if (!(logicalPlan instanceof BatchInsertIntoTableCommand ||
logicalPlan instanceof InsertIntoTableCommand
- || logicalPlan instanceof UpdateCommand)) {
- String errMsg = "This is in a transaction, only insert,
update, commit, rollback is acceptable.";
+ || logicalPlan instanceof UpdateCommand || logicalPlan
instanceof DeleteFromUsingCommand
+ || logicalPlan instanceof DeleteFromCommand)) {
+ String errMsg = "This is in a transaction, only insert,
update, delete, "
+ + "commit, rollback is acceptable.";
throw new NereidsException(errMsg, new
AnalysisException(errMsg));
}
}
diff --git a/regression-test/data/insert_p0/txn_insert.out
b/regression-test/data/insert_p0/txn_insert.out
index d317b2d8452..299b136f344 100644
--- a/regression-test/data/insert_p0/txn_insert.out
+++ b/regression-test/data/insert_p0/txn_insert.out
@@ -304,3 +304,22 @@
-- !select26 --
1 a 100
+-- !select27 --
+1 2000-01-01 1 1 1.0
+3 2000-01-03 3 3 3.0
+
+-- !select28 --
+2 2000-01-20 20 20 20.0
+3 2000-01-30 30 30 30.0
+4 2000-01-04 4 4 4.0
+6 2000-01-10 10 10 10.0
+
+-- !select29 --
+3 2000-01-03 3 3 3.0
+
+-- !select30 --
+1 2000-01-01 1 1 1.0
+2 2000-01-02 2 2 2.0
+3 2000-01-03 3 3 3.0
+6 2000-01-10 10 10 10.0
+
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy
b/regression-test/suites/insert_p0/txn_insert.groovy
index a8f173e62bc..7d4f1e9f329 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -239,8 +239,108 @@ suite("txn_insert") {
sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
sql """ update ${ut_table}_1 set score = 101 where id = 1; """
sql """ commit; """
+ sql "sync"
order_qt_select25 """select * from ${ut_table}_1 """
order_qt_select26 """select * from ${ut_table}_2 """
}
+
+ // 8. delete from using and delete from stmt
+ if (use_nereids_planner) {
+ for (def ta in ["txn_insert_dt1", "txn_insert_dt2",
"txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
+ sql """ drop table if exists ${ta} """
+ }
+
+ for (def ta in ["txn_insert_dt1", "txn_insert_dt4",
"txn_insert_dt5"]) {
+ sql """
+ create table ${ta} (
+ id int,
+ dt date,
+ c1 bigint,
+ c2 string,
+ c3 double
+ ) unique key (id, dt)
+ partition by range(dt) (
+ from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY
+ )
+ distributed by hash(id)
+ properties(
+ 'replication_num'='1',
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+ sql """
+ INSERT INTO ${ta} VALUES
+ (1, '2000-01-01', 1, '1', 1.0),
+ (2, '2000-01-02', 2, '2', 2.0),
+ (3, '2000-01-03', 3, '3', 3.0);
+ """
+ }
+
+ sql """
+ create table txn_insert_dt2 (
+ id int,
+ dt date,
+ c1 bigint,
+ c2 string,
+ c3 double
+ ) unique key (id)
+ distributed by hash(id)
+ properties(
+ 'replication_num'='1'
+ );
+ """
+ sql """
+ create table txn_insert_dt3 (
+ id int
+ ) distributed by hash(id)
+ properties(
+ 'replication_num'='1'
+ );
+ """
+ sql """
+ INSERT INTO txn_insert_dt2 VALUES
+ (1, '2000-01-10', 10, '10', 10.0),
+ (2, '2000-01-20', 20, '20', 20.0),
+ (3, '2000-01-30', 30, '30', 30.0),
+ (4, '2000-01-04', 4, '4', 4.0),
+ (5, '2000-01-05', 5, '5', 5.0);
+ """
+ sql """
+ INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5);
+ """
+ sql """ begin """
+ test {
+ sql '''
+ delete from txn_insert_dt1 temporary partition (p_20000102)
+ using txn_insert_dt2 join txn_insert_dt3 on
txn_insert_dt2.id = txn_insert_dt3.id
+ where txn_insert_dt1.id = txn_insert_dt2.id;
+ '''
+ exception 'Partition: p_20000102 is not exists'
+ }
+ sql """
+ delete from txn_insert_dt1 partition (p_20000102)
+ using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id
= txn_insert_dt3.id
+ where txn_insert_dt1.id = txn_insert_dt2.id;
+ """
+ sql """
+ delete from txn_insert_dt4
+ using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id
= txn_insert_dt3.id
+ where txn_insert_dt4.id = txn_insert_dt2.id;
+ """
+ sql """
+ delete from txn_insert_dt2 where id = 1 or id = 5;
+ """
+ sql """
+ delete from txn_insert_dt5 partition(p_20000102) where id = 1
or id = 5;
+ """
+ sql """ commit """
+ sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10,
'10', 10.0) """
+ sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10,
'10', 10.0) """
+ sql "sync"
+ order_qt_select27 """select * from txn_insert_dt1 """
+ order_qt_select28 """select * from txn_insert_dt2 """
+ order_qt_select29 """select * from txn_insert_dt4 """
+ order_qt_select30 """select * from txn_insert_dt5 """
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]