This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 2421a605bd5 branch-3.1: [opt](iceberg) support insert to branch #52751
#53703 (#53693)
2421a605bd5 is described below
commit 2421a605bd597dde51f44717a8e7ca7312922ec5
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 22 03:58:11 2025 -0700
branch-3.1: [opt](iceberg) support insert to branch #52751 #53703 (#53693)
bp #52751 #53703
---------
Co-authored-by: wuwenchi <[email protected]>
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 6 +-
.../datasource/iceberg/IcebergTransaction.java | 52 ++++--
.../doris/nereids/parser/LogicalPlanBuilder.java | 8 +-
.../trees/plans/commands/CreateTableCommand.java | 4 +-
.../plans/commands/DeleteFromUsingCommand.java | 2 +-
.../trees/plans/commands/UpdateCommand.java | 2 +-
.../plans/commands/UpdateMvByPartitionCommand.java | 2 +-
.../insert/IcebergInsertCommandContext.java | 11 ++
.../commands/insert/IcebergInsertExecutor.java | 4 +-
.../commands/insert/InsertIntoTableCommand.java | 17 +-
.../insert/InsertOverwriteTableCommand.java | 18 +-
.../org/apache/doris/planner/IcebergTableSink.java | 4 +-
.../datasource/iceberg/IcebergTransactionTest.java | 16 +-
.../iceberg/iceberg_branch_insert_data.out | Bin 0 -> 2980 bytes
.../iceberg/iceberg_branch_insert_data.groovy | 191 +++++++++++++++++++++
15 files changed, 298 insertions(+), 39 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 5d95771127d..698cbc62822 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -130,9 +130,13 @@ constraintStatement
| SHOW CONSTRAINTS FROM table=multipartIdentifier
#showConstraint
;
+optSpecBranch
+ : ATSIGN BRANCH LEFT_PAREN name=identifier RIGHT_PAREN
+ ;
+
supportedDmlStatement
: explain? cte? INSERT (INTO | OVERWRITE TABLE)
- (tableName=multipartIdentifier | DORIS_INTERNAL_TABLE_ID LEFT_PAREN
tableId=INTEGER_VALUE RIGHT_PAREN)
+ (tableName=multipartIdentifier (optSpecBranch)? |
DORIS_INTERNAL_TABLE_ID LEFT_PAREN tableId=INTEGER_VALUE RIGHT_PAREN)
partitionSpec? // partition define
(WITH LABEL labelName=identifier)? cols=identifierList? // label and
columns define
(LEFT_BRACKET hints=identifierSeq RIGHT_BRACKET)? // hint define
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 9160c3012b5..019b57dface 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -24,7 +24,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
-import
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import
org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TUpdateMode;
@@ -38,6 +38,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.WriteResult;
@@ -61,6 +62,9 @@ public class IcebergTransaction implements Transaction {
private org.apache.iceberg.Transaction transaction;
private final List<TIcebergCommitData> commitDataList =
Lists.newArrayList();
+ private IcebergInsertCommandContext insertCtx;
+ private String branchName;
+
public IcebergTransaction(IcebergMetadataOps ops) {
this.ops = ops;
}
@@ -71,20 +75,34 @@ public class IcebergTransaction implements Transaction {
}
}
- public void beginInsert(ExternalTable dorisTable) throws UserException {
+ public void beginInsert(ExternalTable dorisTable,
Optional<InsertCommandContext> ctx) throws UserException {
+ ctx.ifPresent(c -> this.insertCtx = (IcebergInsertCommandContext) c);
try {
ops.getPreExecutionAuthenticator().execute(() -> {
// create and start the iceberg transaction
this.table = IcebergUtils.getIcebergTable(dorisTable);
+ // check branch
+ if (insertCtx != null &&
insertCtx.getBranchName().isPresent()) {
+ this.branchName = insertCtx.getBranchName().get();
+ SnapshotRef branchRef = table.refs().get(branchName);
+ if (branchRef == null) {
+ throw new RuntimeException(branchName + " is not
founded in " + dorisTable.getName());
+ } else if (!branchRef.isBranch()) {
+ throw new RuntimeException(
+ branchName
+ + " is a tag, not a branch. Tags cannot be
targets for producing snapshots");
+ }
+ }
this.transaction = table.newTransaction();
});
} catch (Exception e) {
- throw new UserException("Failed to begin insert for iceberg table
" + dorisTable.getName(), e);
+ throw new UserException("Failed to begin insert for iceberg table
" + dorisTable.getName()
+ + "because: " + e.getMessage(), e);
}
}
- public void finishInsert(NameMapping nameMapping,
Optional<InsertCommandContext> insertCtx) {
+ public void finishInsert(NameMapping nameMapping) {
if (LOG.isDebugEnabled()) {
LOG.info("iceberg table {} insert table finished!",
nameMapping.getFullLocalName());
}
@@ -92,8 +110,8 @@ public class IcebergTransaction implements Transaction {
ops.getPreExecutionAuthenticator().execute(() -> {
//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
- if (insertCtx.isPresent()) {
- updateMode = ((BaseExternalTableInsertCommandContext)
insertCtx.get()).isOverwrite()
+ if (insertCtx != null) {
+ updateMode = insertCtx.isOverwrite()
? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
}
@@ -146,6 +164,9 @@ public class IcebergTransaction implements Transaction {
private void commitAppendTxn(List<WriteResult> pendingResults) {
// commit append files.
AppendFiles appendFiles =
transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
+ if (branchName != null) {
+ appendFiles = appendFiles.toBranch(branchName);
+ }
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files for append.");
@@ -161,11 +182,14 @@ public class IcebergTransaction implements Transaction {
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will
be emptied.
if (!transaction.table().spec().isPartitioned()) {
- OverwriteFiles overwriteFiles = transaction
- .newOverwrite()
- .scanManifestsWith(ops.getThreadPoolWithPreAuth());
+ OverwriteFiles overwriteFiles = transaction.newOverwrite();
+ if (branchName != null) {
+ overwriteFiles = overwriteFiles.toBranch(branchName);
+ }
+ overwriteFiles =
overwriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
- fileScanTasks.forEach(f ->
overwriteFiles.deleteFile(f.file()));
+ OverwriteFiles finalOverwriteFiles = overwriteFiles;
+ fileScanTasks.forEach(f ->
finalOverwriteFiles.deleteFile(f.file()));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -175,9 +199,11 @@ public class IcebergTransaction implements Transaction {
}
// commit replace partitions
- ReplacePartitions appendPartitionOp = transaction
- .newReplacePartitions()
- .scanManifestsWith(ops.getThreadPoolWithPreAuth());
+ ReplacePartitions appendPartitionOp =
transaction.newReplacePartitions();
+ if (branchName != null) {
+ appendPartitionOp = appendPartitionOp.toBranch(branchName);
+ }
+ appendPartitionOp =
appendPartitionOp.scanManifestsWith(ops.getThreadPoolWithPreAuth());
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files.");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index d847130b922..ea8edffcdbf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -610,6 +610,10 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
throw new ParseException("tableName and tableId cannot both be
null");
}
Optional<String> labelName = (ctx.labelName == null) ?
Optional.empty() : Optional.of(ctx.labelName.getText());
+ Optional<String> branchName = Optional.empty();
+ if (ctx.optSpecBranch() != null) {
+ branchName = Optional.of(ctx.optSpecBranch().name.getText());
+ }
List<String> colNames = ctx.cols == null ? ImmutableList.of() :
visitIdentifierList(ctx.cols);
// TODO visit partitionSpecCtx
LogicalPlan plan = visitQuery(ctx.query());
@@ -638,7 +642,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
}
LogicalPlan command;
if (isOverwrite) {
- command = new InsertOverwriteTableCommand(sink, labelName, cte);
+ command = new InsertOverwriteTableCommand(sink, labelName, cte,
branchName);
} else {
if (ConnectContext.get() != null &&
ConnectContext.get().isTxnModel()
&& sink.child() instanceof InlineTable
@@ -648,7 +652,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
// Now handle it as `insert into select`(a separate load
job), should fix it as the legacy.
command = new BatchInsertIntoTableCommand(sink);
} else {
- command = new InsertIntoTableCommand(sink, labelName,
Optional.empty(), cte);
+ command = new InsertIntoTableCommand(sink, labelName,
Optional.empty(), cte, true, branchName);
}
}
return withExplain(command, ctx.explain());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
index a00f34d29f0..66e181b38dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
@@ -176,8 +176,8 @@ public class CreateTableCommand extends Command implements
ForwardWithSync {
ImmutableList.of(), ImmutableList.of(), ImmutableList.of(),
query);
try {
if (!FeConstants.runningUnitTest) {
- new InsertIntoTableCommand(query, Optional.empty(),
Optional.empty(), Optional.empty()).run(
- ctx, executor);
+ new InsertIntoTableCommand(query, Optional.empty(),
Optional.empty(),
+ Optional.empty(), true, Optional.empty()).run(ctx,
executor);
}
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
handleFallbackFailedCtas(ctx);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
index de374d939c1..764ff05c00f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
@@ -54,7 +54,7 @@ public class DeleteFromUsingCommand extends DeleteFromCommand
{
}
// NOTE: delete from using command is executed as insert command, so
txn insert can support it
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery),
Optional.empty(), Optional.empty(),
- Optional.empty()).run(ctx, executor);
+ Optional.empty(), true, Optional.empty()).run(ctx, executor);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index f79303acf22..9cbedffa2bc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -97,7 +97,7 @@ public class UpdateCommand extends Command implements
ForwardWithSync, Explainab
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
// NOTE: update command is executed as insert command, so txn insert
can support it
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery),
Optional.empty(), Optional.empty(),
- Optional.empty()).run(ctx, executor);
+ Optional.empty(), true, Optional.empty()).run(ctx, executor);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index fac66f97abb..89b00cd788d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -86,7 +86,7 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
private static final Logger LOG =
LogManager.getLogger(UpdateMvByPartitionCommand.class);
private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) {
- super(logicalQuery, Optional.empty(), Optional.empty());
+ super(logicalQuery, Optional.empty(), Optional.empty(),
Optional.empty());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
index a091405b219..6a921cee10c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
@@ -17,8 +17,19 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import java.util.Optional;
+
/**
* For iceberg External Table
*/
public class IcebergInsertCommandContext extends
BaseExternalTableInsertCommandContext {
+ private Optional<String> branchName = Optional.empty();
+
+ public Optional<String> getBranchName() {
+ return branchName;
+ }
+
+ public void setBranchName(Optional<String> branchName) {
+ this.branchName = branchName;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index 1eee1a525be..6f2c465a831 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -49,7 +49,7 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
@Override
protected void beforeExec() throws UserException {
IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
- transaction.beginInsert((IcebergExternalTable) table);
+ transaction.beginInsert((IcebergExternalTable) table, insertCtx);
}
@Override
@@ -60,7 +60,7 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
dorisTable.getRemoteDbName(), dorisTable.getRemoteName());
IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
this.loadedRows = transaction.getUpdateCnt();
- transaction.finishInsert(nameMapping, insertCtx);
+ transaction.finishInsert(nameMapping);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 29c81229e58..bd46076bd27 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -92,6 +92,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
private LogicalPlan originLogicalQuery;
private Optional<LogicalPlan> logicalQuery;
private Optional<String> labelName;
+ private Optional<String> branchName;
/**
* When source it's from job scheduler,it will be set.
*/
@@ -102,7 +103,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String>
labelName,
Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan>
cte) {
- this(logicalQuery, labelName, insertCtx, cte, true);
+ this(logicalQuery, labelName, insertCtx, cte, true, Optional.empty());
}
/**
@@ -110,7 +111,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
*/
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String>
labelName,
Optional<InsertCommandContext> insertCtx,
Optional<LogicalPlan> cte,
- boolean needNormalizePlan) {
+ boolean needNormalizePlan, Optional<String>
branchName) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.originLogicalQuery = Objects.requireNonNull(logicalQuery,
"logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should
not be null");
@@ -118,6 +119,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
this.insertCtx = insertCtx;
this.cte = cte;
this.needNormalizePlan = needNormalizePlan;
+ this.branchName = branchName;
}
public LogicalPlan getLogicalQuery() {
@@ -284,6 +286,11 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
String label = this.labelName.orElse(
ctx.isTxnModel() ? null : String.format("label_%x_%x",
ctx.queryId().hi, ctx.queryId().lo));
+ // check branch
+ if (branchName.isPresent() && !(physicalSink instanceof
PhysicalIcebergTableSink)) {
+ throw new AnalysisException("Only support insert data into
iceberg table's branch");
+ }
+
if (physicalSink instanceof PhysicalOlapTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
OlapTable olapTable = (OlapTable) targetTableIf;
@@ -339,12 +346,16 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
} else if (physicalSink instanceof PhysicalIcebergTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
IcebergExternalTable icebergExternalTable =
(IcebergExternalTable) targetTableIf;
+ IcebergInsertCommandContext icebergInsertCtx = insertCtx
+ .map(insertCommandContext ->
(IcebergInsertCommandContext) insertCommandContext)
+ .orElseGet(IcebergInsertCommandContext::new);
+ branchName.ifPresent(notUsed ->
icebergInsertCtx.setBranchName(branchName));
return ExecutorFactory.from(
planner,
dataSink,
physicalSink,
() -> new IcebergInsertExecutor(ctx,
icebergExternalTable, label, planner,
- Optional.of(insertCtx.orElse((new
BaseExternalTableInsertCommandContext()))),
+ Optional.of(icebergInsertCtx),
emptyInsert
)
);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index 925de05998d..0ad5a55e879 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -50,6 +50,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -92,17 +93,19 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
private final Optional<LogicalPlan> cte;
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isRunning = new AtomicBoolean(false);
+ private Optional<String> branchName;
/**
* constructor
*/
public InsertOverwriteTableCommand(LogicalPlan logicalQuery,
Optional<String> labelName,
- Optional<LogicalPlan> cte) {
+ Optional<LogicalPlan> cte, Optional<String> branchName) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.originLogicalQuery = Objects.requireNonNull(logicalQuery,
"logicalQuery should not be null");
this.logicalQuery = Optional.empty();
this.labelName = Objects.requireNonNull(labelName, "labelName should
not be null");
this.cte = cte;
+ this.branchName = branchName;
}
public void setLabelName(Optional<String> labelName) {
@@ -185,6 +188,13 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+
+ // check branch
+ if (branchName.isPresent() && !(physicalTableSink instanceof
PhysicalIcebergTableSink)) {
+ throw new AnalysisException(
+ "Only support insert overwrite into iceberg table's
branch");
+ }
+
InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(),
targetTable);
isRunning.set(true);
@@ -283,7 +293,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
private void runInsertCommand(LogicalPlan logicalQuery,
InsertCommandContext insertCtx,
ConnectContext ctx, StmtExecutor executor) throws Exception {
InsertIntoTableCommand insertCommand = new
InsertIntoTableCommand(logicalQuery, labelName,
- Optional.of(insertCtx), Optional.empty(), false);
+ Optional.of(insertCtx), Optional.empty(), false,
Optional.empty());
insertCommand.run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg =
Strings.emptyToNull(ctx.getState().getErrorMessage());
@@ -351,6 +361,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
(LogicalPlan) (sink.child(0)));
insertCtx = new IcebergInsertCommandContext();
((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
+ branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext)
insertCtx).setBranchName(branchName));
} else {
throw new UserException("Current catalog does not support insert
overwrite yet.");
}
@@ -378,6 +389,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
} else if (logicalQuery instanceof UnboundIcebergTableSink) {
insertCtx = new IcebergInsertCommandContext();
((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
+ branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext)
insertCtx).setBranchName(branchName));
} else {
throw new UserException("Current catalog does not support insert
overwrite yet.");
}
@@ -399,7 +411,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
boolean allowAutoPartition =
ctx.getConnectContext().getSessionVariable().isEnableAutoCreateWhenOverwrite();
OlapInsertCommandContext insertCtx = new
OlapInsertCommandContext(allowAutoPartition, true);
InsertIntoTableCommand insertIntoTableCommand = new
InsertIntoTableCommand(
- logicalQuery, labelName, Optional.of(insertCtx),
Optional.empty());
+ logicalQuery, labelName, Optional.of(insertCtx),
Optional.empty(), true, Optional.empty());
return insertIntoTableCommand.getExplainPlanner(logicalPlan, ctx);
}
return Optional.empty();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index a3918a3ce54..af345a9270c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -23,7 +23,7 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.iceberg.IcebergVendedCredentialsProvider;
-import
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import
org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
@@ -150,7 +150,7 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
}
if (insertCtx.isPresent()) {
- BaseExternalTableInsertCommandContext context =
(BaseExternalTableInsertCommandContext) insertCtx.get();
+ IcebergInsertCommandContext context =
(IcebergInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
}
tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index dbb266d82c2..f7c89a9c1cb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -191,8 +191,8 @@ public class IcebergTransactionTest {
.thenReturn(table);
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(icebergExternalTable);
- txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition), Optional.empty());
+ txn.beginInsert(icebergExternalTable, Optional.empty());
+ txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition));
txn.commit();
}
@@ -304,8 +304,8 @@ public class IcebergTransactionTest {
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(icebergExternalTable);
- txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition), Optional.empty());
+ txn.beginInsert(icebergExternalTable, Optional.empty());
+ txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition));
txn.commit();
}
@@ -415,10 +415,10 @@ public class IcebergTransactionTest {
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(icebergExternalTable);
IcebergInsertCommandContext ctx = new
IcebergInsertCommandContext();
+ txn.beginInsert(icebergExternalTable, Optional.of(ctx));
ctx.setOverwrite(true);
- txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition), Optional.of(ctx));
+ txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition));
txn.commit();
}
@@ -440,10 +440,10 @@ public class IcebergTransactionTest {
.thenReturn(table);
IcebergTransaction txn = getTxn();
- txn.beginInsert(icebergExternalTable);
IcebergInsertCommandContext ctx = new
IcebergInsertCommandContext();
+ txn.beginInsert(icebergExternalTable, Optional.of(ctx));
ctx.setOverwrite(true);
- txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition), Optional.of(ctx));
+ txn.finishInsert(NameMapping.createForTest(dbName,
tbWithPartition));
txn.commit();
}
diff --git
a/regression-test/data/external_table_p0/iceberg/iceberg_branch_insert_data.out
b/regression-test/data/external_table_p0/iceberg/iceberg_branch_insert_data.out
new file mode 100644
index 00000000000..4e0a3d0909f
Binary files /dev/null and
b/regression-test/data/external_table_p0/iceberg/iceberg_branch_insert_data.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy
new file mode 100644
index 00000000000..08a72138ee5
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_branch_insert_data",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "iceberg_branch_insert_data"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """drop database if exists
${catalog_name}.iceberg_branch_insert_data_db force"""
+ sql """create database ${catalog_name}.iceberg_branch_insert_data_db"""
+ sql """ use ${catalog_name}.iceberg_branch_insert_data_db """
+
+ String tmp_tb1 = catalog_name + "_tmp_tb1"
+ String tmp_tb2 = catalog_name + "_tmp_tb2"
+
+ // create an unpartitioned table
+ sql """ drop table if exists ${tmp_tb1} """
+ sql """ create table ${tmp_tb1} (id int, par string) """
+
+ // create a partitioned table
+ sql """ drop table if exists ${tmp_tb2} """
+ sql """ create table ${tmp_tb2} (id int, par string) PARTITION BY
LIST(par)() """
+
+ sql """ insert into ${tmp_tb1} values (1,'a'),(2,'a'),(3,'b'),(4,'b') """;
+ sql """ insert into ${tmp_tb2} values (1,'a'),(2,'a'),(3,'b'),(4,'b') """;
+
+ sql """ alter table ${tmp_tb1} create tag t1 """
+ sql """ alter table ${tmp_tb1} create branch b1 """
+ sql """ alter table ${tmp_tb1} create branch b2 """
+
+ sql """ alter table ${tmp_tb2} create tag t1 """
+ sql """ alter table ${tmp_tb2} create branch b1 """
+ sql """ alter table ${tmp_tb2} create branch b2 """
+
+ // error with not exists branch
+ test {
+ sql """ insert into ${tmp_tb1}@branch(brrrr1) values (5,'a') """
+ exception "brrrr1 is not founded"
+ }
+ test {
+ sql """ insert into ${tmp_tb2}@branch(brrrr2) values (5,'a') """
+ exception "brrrr2 is not founded"
+ }
+
+ // error with tag
+ test {
+ sql """ insert into ${tmp_tb1}@branch(t1) values (5,'a') """
+ exception "t1 is a tag, not a branch"
+ }
+ test {
+ sql """ insert into ${tmp_tb2}@branch(t1) values (5,'a') """
+ exception "t1 is a tag, not a branch"
+ }
+
+ def execute = { table_name ->
+
+ def query_all = {
+ order_qt_main """ select * from ${table_name} order by id """
+ order_qt_t1 """ select * from ${table_name}@tag(t1) order by id """
+ order_qt_b1 """ select * from ${table_name}@branch(b1) order by id
"""
+ order_qt_b2 """ select * from ${table_name}@branch(b2) order by id
"""
+ }
+
+ query_all()
+
+ sql """ insert into ${table_name}@branch(b1) values (5,'a'),(6,'a') """
+ // 1,2,3,4
+ // 1,2,3,4
+ // 1,2,3,4,5,6
+ // 1,2,3,4
+ query_all()
+
+ sql """ insert into ${table_name} values (7,'c'),(8,'c') """
+ // 1,2,3,4,7,8
+ // 1,2,3,4
+ // 1,2,3,4,5,6
+ // 1,2,3,4
+ query_all()
+
+ sql """ insert overwrite table ${table_name} values (9,'a'),(10,'b')
"""
+ // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+ // 1,2,3,4
+ // 1,2,3,4,5,6
+ // 1,2,3,4
+ query_all()
+
+ sql """ insert overwrite table ${table_name}@branch(b1) values
(11,'a'), (12,'b') """
+ // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+ // 1,2,3,4
+ // 11,12
+ // 1,2,3,4
+ query_all()
+
+ sql """ insert into ${table_name}@branch(b1) select * from
${table_name} """
+ // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+ // 1,2,3,4
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // 1,2,3,4
+ query_all()
+
+ sql """ insert overwrite table ${table_name}@branch(b2) select * from
${table_name} """
+ // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+ // 1,2,3,4
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+ query_all()
+
+ sql """ insert overwrite table ${table_name}@branch(b2) select * from
${table_name}@branch(b1) """
+ // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+ // 1,2,3,4
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ query_all()
+
+ sql """ insert overwrite table ${table_name} select * from
${table_name}@branch(b1) """
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // 1,2,3,4
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ query_all()
+
+ sql """ insert overwrite table ${table_name}@branch(b2) select * from
${table_name}@tag(t1) """
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // 1,2,3,4
+ // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+ // Non-partitioned table: 1,2,3,4 Partitioned table: 1,2,3,4,7,8
+ query_all()
+
+ // insert with columns
+ sql """ insert into ${table_name}@branch(b1)(par) values('part');"""
+ query_all()
+ sql """ insert overwrite table ${table_name}@branch(b1)(par)
values('part2');"""
+ query_all()
+
+ }
+
+ execute(tmp_tb1)
+ execute(tmp_tb2)
+
+ // test insert table which not support branch
+ sql """switch internal"""
+ sql """drop database if exists iceberg_branch_insert_data_internal_db"""
+ sql """create database iceberg_branch_insert_data_internal_db"""
+ sql """use iceberg_branch_insert_data_internal_db"""
+ sql """create table iceberg_branch_insert_data_internal_tb (id int, par
string) properties("replication_num" = "1")"""
+
+ test {
+ sql """insert into iceberg_branch_insert_data_internal_tb@branch(b1)
values (1,'a'),(2,'a'),(3,'b'),(4,'b')"""
+ exception "Only support insert data into iceberg table's branch"
+ }
+
+ test {
+ sql """insert overwrite table
iceberg_branch_insert_data_internal_tb@branch(b1) values
(1,'a'),(2,'a'),(3,'b'),(4,'b')"""
+ exception "Only support insert overwrite into iceberg table's branch"
+ }
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]