This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 2421a605bd5 branch-3.1: [opt](iceberg) support insert to branch #52751 
#53703 (#53693)
2421a605bd5 is described below

commit 2421a605bd597dde51f44717a8e7ca7312922ec5
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 22 03:58:11 2025 -0700

    branch-3.1: [opt](iceberg) support insert to branch #52751 #53703 (#53693)
    
    bp #52751 #53703
    
    ---------
    
    Co-authored-by: wuwenchi <[email protected]>
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   6 +-
 .../datasource/iceberg/IcebergTransaction.java     |  52 ++++--
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   8 +-
 .../trees/plans/commands/CreateTableCommand.java   |   4 +-
 .../plans/commands/DeleteFromUsingCommand.java     |   2 +-
 .../trees/plans/commands/UpdateCommand.java        |   2 +-
 .../plans/commands/UpdateMvByPartitionCommand.java |   2 +-
 .../insert/IcebergInsertCommandContext.java        |  11 ++
 .../commands/insert/IcebergInsertExecutor.java     |   4 +-
 .../commands/insert/InsertIntoTableCommand.java    |  17 +-
 .../insert/InsertOverwriteTableCommand.java        |  18 +-
 .../org/apache/doris/planner/IcebergTableSink.java |   4 +-
 .../datasource/iceberg/IcebergTransactionTest.java |  16 +-
 .../iceberg/iceberg_branch_insert_data.out         | Bin 0 -> 2980 bytes
 .../iceberg/iceberg_branch_insert_data.groovy      | 191 +++++++++++++++++++++
 15 files changed, 298 insertions(+), 39 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 5d95771127d..698cbc62822 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -130,9 +130,13 @@ constraintStatement
     | SHOW CONSTRAINTS FROM table=multipartIdentifier                     
#showConstraint
     ;
 
+optSpecBranch
+    : ATSIGN BRANCH LEFT_PAREN name=identifier RIGHT_PAREN
+    ;
+
 supportedDmlStatement
     : explain? cte? INSERT (INTO | OVERWRITE TABLE)
-        (tableName=multipartIdentifier | DORIS_INTERNAL_TABLE_ID LEFT_PAREN 
tableId=INTEGER_VALUE RIGHT_PAREN)
+        (tableName=multipartIdentifier (optSpecBranch)? | 
DORIS_INTERNAL_TABLE_ID LEFT_PAREN tableId=INTEGER_VALUE RIGHT_PAREN)
         partitionSpec?  // partition define
         (WITH LABEL labelName=identifier)? cols=identifierList?  // label and 
columns define
         (LEFT_BRACKET hints=identifierSeq RIGHT_BRACKET)?  // hint define
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 9160c3012b5..019b57dface 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -24,7 +24,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
-import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.thrift.TIcebergCommitData;
 import org.apache.doris.thrift.TUpdateMode;
@@ -38,6 +38,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.WriteResult;
@@ -61,6 +62,9 @@ public class IcebergTransaction implements Transaction {
     private org.apache.iceberg.Transaction transaction;
     private final List<TIcebergCommitData> commitDataList = 
Lists.newArrayList();
 
+    private IcebergInsertCommandContext insertCtx;
+    private String branchName;
+
     public IcebergTransaction(IcebergMetadataOps ops) {
         this.ops = ops;
     }
@@ -71,20 +75,34 @@ public class IcebergTransaction implements Transaction {
         }
     }
 
-    public void beginInsert(ExternalTable dorisTable) throws UserException {
+    public void beginInsert(ExternalTable dorisTable, 
Optional<InsertCommandContext> ctx) throws UserException {
+        ctx.ifPresent(c -> this.insertCtx = (IcebergInsertCommandContext) c);
         try {
             ops.getPreExecutionAuthenticator().execute(() -> {
                 // create and start the iceberg transaction
                 this.table = IcebergUtils.getIcebergTable(dorisTable);
+                // check branch
+                if (insertCtx != null && 
insertCtx.getBranchName().isPresent()) {
+                    this.branchName = insertCtx.getBranchName().get();
+                    SnapshotRef branchRef = table.refs().get(branchName);
+                    if (branchRef == null) {
+                        throw new RuntimeException(branchName + " is not 
founded in " + dorisTable.getName());
+                    } else if (!branchRef.isBranch()) {
+                        throw new RuntimeException(
+                            branchName
+                                + " is a tag, not a branch. Tags cannot be 
targets for producing snapshots");
+                    }
+                }
                 this.transaction = table.newTransaction();
             });
         } catch (Exception e) {
-            throw new UserException("Failed to begin insert for iceberg table 
" + dorisTable.getName(), e);
+            throw new UserException("Failed to begin insert for iceberg table 
" + dorisTable.getName()
+                    + "because: " + e.getMessage(), e);
         }
 
     }
 
-    public void finishInsert(NameMapping nameMapping, 
Optional<InsertCommandContext> insertCtx) {
+    public void finishInsert(NameMapping nameMapping) {
         if (LOG.isDebugEnabled()) {
             LOG.info("iceberg table {} insert table finished!", 
nameMapping.getFullLocalName());
         }
@@ -92,8 +110,8 @@ public class IcebergTransaction implements Transaction {
             ops.getPreExecutionAuthenticator().execute(() -> {
                 //create and start the iceberg transaction
                 TUpdateMode updateMode = TUpdateMode.APPEND;
-                if (insertCtx.isPresent()) {
-                    updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite()
+                if (insertCtx != null) {
+                    updateMode = insertCtx.isOverwrite()
                             ? TUpdateMode.OVERWRITE
                             : TUpdateMode.APPEND;
                 }
@@ -146,6 +164,9 @@ public class IcebergTransaction implements Transaction {
     private void commitAppendTxn(List<WriteResult> pendingResults) {
         // commit append files.
         AppendFiles appendFiles = 
transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
+        if (branchName != null) {
+            appendFiles = appendFiles.toBranch(branchName);
+        }
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files for append.");
@@ -161,11 +182,14 @@ public class IcebergTransaction implements Transaction {
             // 1. if dst_tb is a partitioned table, it will return directly.
             // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will 
be emptied.
             if (!transaction.table().spec().isPartitioned()) {
-                OverwriteFiles overwriteFiles = transaction
-                        .newOverwrite()
-                        .scanManifestsWith(ops.getThreadPoolWithPreAuth());
+                OverwriteFiles overwriteFiles = transaction.newOverwrite();
+                if (branchName != null) {
+                    overwriteFiles = overwriteFiles.toBranch(branchName);
+                }
+                overwriteFiles = 
overwriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());
                 try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
-                    fileScanTasks.forEach(f -> 
overwriteFiles.deleteFile(f.file()));
+                    OverwriteFiles finalOverwriteFiles = overwriteFiles;
+                    fileScanTasks.forEach(f -> 
finalOverwriteFiles.deleteFile(f.file()));
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -175,9 +199,11 @@ public class IcebergTransaction implements Transaction {
         }
 
         // commit replace partitions
-        ReplacePartitions appendPartitionOp = transaction
-                .newReplacePartitions()
-                .scanManifestsWith(ops.getThreadPoolWithPreAuth());
+        ReplacePartitions appendPartitionOp = 
transaction.newReplacePartitions();
+        if (branchName != null) {
+            appendPartitionOp = appendPartitionOp.toBranch(branchName);
+        }
+        appendPartitionOp = 
appendPartitionOp.scanManifestsWith(ops.getThreadPoolWithPreAuth());
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files.");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index d847130b922..ea8edffcdbf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -610,6 +610,10 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             throw new ParseException("tableName and tableId cannot both be 
null");
         }
         Optional<String> labelName = (ctx.labelName == null) ? 
Optional.empty() : Optional.of(ctx.labelName.getText());
+        Optional<String> branchName = Optional.empty();
+        if (ctx.optSpecBranch() != null) {
+            branchName = Optional.of(ctx.optSpecBranch().name.getText());
+        }
         List<String> colNames = ctx.cols == null ? ImmutableList.of() : 
visitIdentifierList(ctx.cols);
         // TODO visit partitionSpecCtx
         LogicalPlan plan = visitQuery(ctx.query());
@@ -638,7 +642,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         }
         LogicalPlan command;
         if (isOverwrite) {
-            command = new InsertOverwriteTableCommand(sink, labelName, cte);
+            command = new InsertOverwriteTableCommand(sink, labelName, cte, 
branchName);
         } else {
             if (ConnectContext.get() != null && 
ConnectContext.get().isTxnModel()
                     && sink.child() instanceof InlineTable
@@ -648,7 +652,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                 //  Now handle it as `insert into select`(a separate load 
job), should fix it as the legacy.
                 command = new BatchInsertIntoTableCommand(sink);
             } else {
-                command = new InsertIntoTableCommand(sink, labelName, 
Optional.empty(), cte);
+                command = new InsertIntoTableCommand(sink, labelName, 
Optional.empty(), cte, true, branchName);
             }
         }
         return withExplain(command, ctx.explain());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
index a00f34d29f0..66e181b38dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
@@ -176,8 +176,8 @@ public class CreateTableCommand extends Command implements 
ForwardWithSync {
                 ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), 
query);
         try {
             if (!FeConstants.runningUnitTest) {
-                new InsertIntoTableCommand(query, Optional.empty(), 
Optional.empty(), Optional.empty()).run(
-                        ctx, executor);
+                new InsertIntoTableCommand(query, Optional.empty(), 
Optional.empty(),
+                        Optional.empty(), true, Optional.empty()).run(ctx, 
executor);
             }
             if (ctx.getState().getStateType() == MysqlStateType.ERR) {
                 handleFallbackFailedCtas(ctx);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
index de374d939c1..764ff05c00f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
@@ -54,7 +54,7 @@ public class DeleteFromUsingCommand extends DeleteFromCommand 
{
         }
         // NOTE: delete from using command is executed as insert command, so 
txn insert can support it
         new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty(), Optional.empty(),
-                Optional.empty()).run(ctx, executor);
+                Optional.empty(), true, Optional.empty()).run(ctx, executor);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index f79303acf22..9cbedffa2bc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -97,7 +97,7 @@ public class UpdateCommand extends Command implements 
ForwardWithSync, Explainab
     public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
         // NOTE: update command is executed as insert command, so txn insert 
can support it
         new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty(), Optional.empty(),
-                Optional.empty()).run(ctx, executor);
+                Optional.empty(), true, Optional.empty()).run(ctx, executor);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index fac66f97abb..89b00cd788d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -86,7 +86,7 @@ public class UpdateMvByPartitionCommand extends 
InsertOverwriteTableCommand {
     private static final Logger LOG = 
LogManager.getLogger(UpdateMvByPartitionCommand.class);
 
     private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) {
-        super(logicalQuery, Optional.empty(), Optional.empty());
+        super(logicalQuery, Optional.empty(), Optional.empty(), 
Optional.empty());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
index a091405b219..6a921cee10c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
@@ -17,8 +17,19 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import java.util.Optional;
+
 /**
  * For iceberg External Table
  */
 public class IcebergInsertCommandContext extends 
BaseExternalTableInsertCommandContext {
+    private Optional<String> branchName = Optional.empty();
+
+    public Optional<String> getBranchName() {
+        return branchName;
+    }
+
+    public void setBranchName(Optional<String> branchName) {
+        this.branchName = branchName;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index 1eee1a525be..6f2c465a831 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -49,7 +49,7 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
     @Override
     protected void beforeExec() throws UserException {
         IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
-        transaction.beginInsert((IcebergExternalTable) table);
+        transaction.beginInsert((IcebergExternalTable) table, insertCtx);
     }
 
     @Override
@@ -60,7 +60,7 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
                 dorisTable.getRemoteDbName(), dorisTable.getRemoteName());
         IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
         this.loadedRows = transaction.getUpdateCnt();
-        transaction.finishInsert(nameMapping, insertCtx);
+        transaction.finishInsert(nameMapping);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 29c81229e58..bd46076bd27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -92,6 +92,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
     private LogicalPlan originLogicalQuery;
     private Optional<LogicalPlan> logicalQuery;
     private Optional<String> labelName;
+    private Optional<String> branchName;
     /**
      * When source it's from job scheduler,it will be set.
      */
@@ -102,7 +103,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
 
     public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName,
             Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> 
cte) {
-        this(logicalQuery, labelName, insertCtx, cte, true);
+        this(logicalQuery, labelName, insertCtx, cte, true, Optional.empty());
     }
 
     /**
@@ -110,7 +111,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
      */
     public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName,
                                   Optional<InsertCommandContext> insertCtx, 
Optional<LogicalPlan> cte,
-                                  boolean needNormalizePlan) {
+                                  boolean needNormalizePlan, Optional<String> 
branchName) {
         super(PlanType.INSERT_INTO_TABLE_COMMAND);
         this.originLogicalQuery = Objects.requireNonNull(logicalQuery, 
"logicalQuery should not be null");
         this.labelName = Objects.requireNonNull(labelName, "labelName should 
not be null");
@@ -118,6 +119,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         this.insertCtx = insertCtx;
         this.cte = cte;
         this.needNormalizePlan = needNormalizePlan;
+        this.branchName = branchName;
     }
 
     public LogicalPlan getLogicalQuery() {
@@ -284,6 +286,11 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
             String label = this.labelName.orElse(
                     ctx.isTxnModel() ? null : String.format("label_%x_%x", 
ctx.queryId().hi, ctx.queryId().lo));
 
+            // check branch
+            if (branchName.isPresent() && !(physicalSink instanceof 
PhysicalIcebergTableSink)) {
+                throw new AnalysisException("Only support insert data into 
iceberg table's branch");
+            }
+
             if (physicalSink instanceof PhysicalOlapTableSink) {
                 boolean emptyInsert = childIsEmptyRelation(physicalSink);
                 OlapTable olapTable = (OlapTable) targetTableIf;
@@ -339,12 +346,16 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
             } else if (physicalSink instanceof PhysicalIcebergTableSink) {
                 boolean emptyInsert = childIsEmptyRelation(physicalSink);
                 IcebergExternalTable icebergExternalTable = 
(IcebergExternalTable) targetTableIf;
+                IcebergInsertCommandContext icebergInsertCtx = insertCtx
+                        .map(insertCommandContext -> 
(IcebergInsertCommandContext) insertCommandContext)
+                        .orElseGet(IcebergInsertCommandContext::new);
+                branchName.ifPresent(notUsed -> 
icebergInsertCtx.setBranchName(branchName));
                 return ExecutorFactory.from(
                         planner,
                         dataSink,
                         physicalSink,
                         () -> new IcebergInsertExecutor(ctx, 
icebergExternalTable, label, planner,
-                                Optional.of(insertCtx.orElse((new 
BaseExternalTableInsertCommandContext()))),
+                                Optional.of(icebergInsertCtx),
                                 emptyInsert
                         )
                 );
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index 925de05998d..0ad5a55e879 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -50,6 +50,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTableSink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -92,17 +93,19 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
     private final Optional<LogicalPlan> cte;
     private AtomicBoolean isCancelled = new AtomicBoolean(false);
     private AtomicBoolean isRunning = new AtomicBoolean(false);
+    private Optional<String> branchName;
 
     /**
      * constructor
      */
     public InsertOverwriteTableCommand(LogicalPlan logicalQuery, 
Optional<String> labelName,
-            Optional<LogicalPlan> cte) {
+            Optional<LogicalPlan> cte, Optional<String> branchName) {
         super(PlanType.INSERT_INTO_TABLE_COMMAND);
         this.originLogicalQuery = Objects.requireNonNull(logicalQuery, 
"logicalQuery should not be null");
         this.logicalQuery = Optional.empty();
         this.labelName = Objects.requireNonNull(labelName, "labelName should 
not be null");
         this.cte = cte;
+        this.branchName = branchName;
     }
 
     public void setLabelName(Optional<String> labelName) {
@@ -185,6 +188,13 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+
+        // check branch
+        if (branchName.isPresent() && !(physicalTableSink instanceof 
PhysicalIcebergTableSink)) {
+            throw new AnalysisException(
+                    "Only support insert overwrite into iceberg table's 
branch");
+        }
+
         InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
         
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(), 
targetTable);
         isRunning.set(true);
@@ -283,7 +293,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
     private void runInsertCommand(LogicalPlan logicalQuery, 
InsertCommandContext insertCtx,
             ConnectContext ctx, StmtExecutor executor) throws Exception {
         InsertIntoTableCommand insertCommand = new 
InsertIntoTableCommand(logicalQuery, labelName,
-                Optional.of(insertCtx), Optional.empty(), false);
+                Optional.of(insertCtx), Optional.empty(), false, 
Optional.empty());
         insertCommand.run(ctx, executor);
         if (ctx.getState().getStateType() == MysqlStateType.ERR) {
             String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
@@ -351,6 +361,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                     (LogicalPlan) (sink.child(0)));
             insertCtx = new IcebergInsertCommandContext();
             ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
+            branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext) 
insertCtx).setBranchName(branchName));
         } else {
             throw new UserException("Current catalog does not support insert 
overwrite yet.");
         }
@@ -378,6 +389,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
         } else if (logicalQuery instanceof UnboundIcebergTableSink) {
             insertCtx = new IcebergInsertCommandContext();
             ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
+            branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext) 
insertCtx).setBranchName(branchName));
         } else {
             throw new UserException("Current catalog does not support insert 
overwrite yet.");
         }
@@ -399,7 +411,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
             boolean allowAutoPartition = 
ctx.getConnectContext().getSessionVariable().isEnableAutoCreateWhenOverwrite();
             OlapInsertCommandContext insertCtx = new 
OlapInsertCommandContext(allowAutoPartition, true);
             InsertIntoTableCommand insertIntoTableCommand = new 
InsertIntoTableCommand(
-                    logicalQuery, labelName, Optional.of(insertCtx), 
Optional.empty());
+                    logicalQuery, labelName, Optional.of(insertCtx), 
Optional.empty(), true, Optional.empty());
             return insertIntoTableCommand.getExplainPlanner(logicalPlan, ctx);
         }
         return Optional.empty();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index a3918a3ce54..af345a9270c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -23,7 +23,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.iceberg.IcebergVendedCredentialsProvider;
-import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
@@ -150,7 +150,7 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         }
 
         if (insertCtx.isPresent()) {
-            BaseExternalTableInsertCommandContext context = 
(BaseExternalTableInsertCommandContext) insertCtx.get();
+            IcebergInsertCommandContext context = 
(IcebergInsertCommandContext) insertCtx.get();
             tSink.setOverwrite(context.isOverwrite());
         }
         tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index dbb266d82c2..f7c89a9c1cb 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -191,8 +191,8 @@ public class IcebergTransactionTest {
                     .thenReturn(table);
             IcebergTransaction txn = getTxn();
             txn.updateIcebergCommitData(ctdList);
-            txn.beginInsert(icebergExternalTable);
-            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition), Optional.empty());
+            txn.beginInsert(icebergExternalTable, Optional.empty());
+            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition));
             txn.commit();
         }
 
@@ -304,8 +304,8 @@ public class IcebergTransactionTest {
 
             IcebergTransaction txn = getTxn();
             txn.updateIcebergCommitData(ctdList);
-            txn.beginInsert(icebergExternalTable);
-            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition), Optional.empty());
+            txn.beginInsert(icebergExternalTable, Optional.empty());
+            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition));
             txn.commit();
         }
 
@@ -415,10 +415,10 @@ public class IcebergTransactionTest {
 
             IcebergTransaction txn = getTxn();
             txn.updateIcebergCommitData(ctdList);
-            txn.beginInsert(icebergExternalTable);
             IcebergInsertCommandContext ctx = new 
IcebergInsertCommandContext();
+            txn.beginInsert(icebergExternalTable, Optional.of(ctx));
             ctx.setOverwrite(true);
-            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition), Optional.of(ctx));
+            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition));
             txn.commit();
         }
 
@@ -440,10 +440,10 @@ public class IcebergTransactionTest {
                     .thenReturn(table);
 
             IcebergTransaction txn = getTxn();
-            txn.beginInsert(icebergExternalTable);
             IcebergInsertCommandContext ctx = new 
IcebergInsertCommandContext();
+            txn.beginInsert(icebergExternalTable, Optional.of(ctx));
             ctx.setOverwrite(true);
-            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition), Optional.of(ctx));
+            txn.finishInsert(NameMapping.createForTest(dbName, 
tbWithPartition));
             txn.commit();
         }
 
diff --git 
a/regression-test/data/external_table_p0/iceberg/iceberg_branch_insert_data.out 
b/regression-test/data/external_table_p0/iceberg/iceberg_branch_insert_data.out
new file mode 100644
index 00000000000..4e0a3d0909f
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/iceberg_branch_insert_data.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy
new file mode 100644
index 00000000000..08a72138ee5
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_branch_insert_data", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "iceberg_branch_insert_data"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """drop database if exists 
${catalog_name}.iceberg_branch_insert_data_db force"""
+    sql """create database ${catalog_name}.iceberg_branch_insert_data_db"""
+    sql """ use ${catalog_name}.iceberg_branch_insert_data_db """
+
+    String tmp_tb1 = catalog_name + "_tmp_tb1"
+    String tmp_tb2 = catalog_name + "_tmp_tb2"
+
+    // create an unpartitioned table
+    sql """ drop table if exists ${tmp_tb1} """
+    sql """ create table ${tmp_tb1} (id int, par string) """
+
+    // create a partitioned table
+    sql """ drop table if exists ${tmp_tb2} """
+    sql """ create table ${tmp_tb2} (id int, par string) PARTITION BY 
LIST(par)() """
+
+    sql """ insert into ${tmp_tb1} values (1,'a'),(2,'a'),(3,'b'),(4,'b') """;
+    sql """ insert into ${tmp_tb2} values (1,'a'),(2,'a'),(3,'b'),(4,'b') """;
+
+    sql """ alter table ${tmp_tb1} create tag t1 """
+    sql """ alter table ${tmp_tb1} create branch b1 """
+    sql """ alter table ${tmp_tb1} create branch b2 """
+
+    sql """ alter table ${tmp_tb2} create tag t1 """
+    sql """ alter table ${tmp_tb2} create branch b1 """
+    sql """ alter table ${tmp_tb2} create branch b2 """
+
+    // error with not exists branch
+    test {
+        sql """ insert into ${tmp_tb1}@branch(brrrr1) values (5,'a') """
+        exception "brrrr1 is not founded"
+    }
+    test {
+        sql """ insert into ${tmp_tb2}@branch(brrrr2) values (5,'a') """
+        exception "brrrr2 is not founded"
+    }
+
+    // error with tag
+    test {
+        sql """ insert into ${tmp_tb1}@branch(t1) values (5,'a') """
+        exception "t1 is a tag, not a branch"
+    }
+    test {
+        sql """ insert into ${tmp_tb2}@branch(t1) values (5,'a') """
+        exception "t1 is a tag, not a branch"
+    }
+
+    def execute = { table_name ->
+
+        def query_all = {
+            order_qt_main """ select * from ${table_name} order by id """
+            order_qt_t1 """ select * from ${table_name}@tag(t1) order by id """
+            order_qt_b1 """ select * from ${table_name}@branch(b1) order by id 
"""
+            order_qt_b2 """ select * from ${table_name}@branch(b2) order by id 
"""
+        }
+
+        query_all()
+
+        sql """ insert into ${table_name}@branch(b1) values (5,'a'),(6,'a') """
+        // 1,2,3,4
+        // 1,2,3,4
+        // 1,2,3,4,5,6
+        // 1,2,3,4
+        query_all()
+
+        sql """ insert into ${table_name} values (7,'c'),(8,'c') """
+        // 1,2,3,4,7,8
+        // 1,2,3,4
+        // 1,2,3,4,5,6
+        // 1,2,3,4
+        query_all()
+
+        sql """ insert overwrite table ${table_name} values (9,'a'),(10,'b') 
"""
+        // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+        // 1,2,3,4
+        // 1,2,3,4,5,6
+        // 1,2,3,4
+        query_all()
+
+        sql """ insert overwrite table ${table_name}@branch(b1) values 
(11,'a'), (12,'b') """
+        // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+        // 1,2,3,4
+        // 11,12
+        // 1,2,3,4
+        query_all()
+
+        sql """ insert into ${table_name}@branch(b1) select * from 
${table_name} """
+        // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+        // 1,2,3,4
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // 1,2,3,4
+        query_all()
+
+        sql """ insert overwrite table ${table_name}@branch(b2) select * from 
${table_name} """
+        // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+        // 1,2,3,4
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+        query_all()
+
+        sql """ insert overwrite table ${table_name}@branch(b2) select * from 
${table_name}@branch(b1) """
+        // Non-partitioned table: 9,10 Partitioned table: 7,8,9,10
+        // 1,2,3,4
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        query_all()
+
+        sql """ insert overwrite table ${table_name} select * from 
${table_name}@branch(b1) """
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // 1,2,3,4
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        query_all()
+
+        sql """ insert overwrite table ${table_name}@branch(b2) select * from 
${table_name}@tag(t1) """
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // 1,2,3,4
+        // Non-partitioned table: 9,10,11,12 Partitioned table: 7,8,9,10,11,12
+        // Non-partitioned table: 1,2,3,4 Partitioned table: 1,2,3,4,7,8
+        query_all()
+
+        // insert with columns
+        sql """ insert into ${table_name}@branch(b1)(par) values('part');"""
+        query_all()
+        sql """ insert overwrite table ${table_name}@branch(b1)(par) 
values('part2');"""
+        query_all()
+        
+    }
+
+    execute(tmp_tb1)
+    execute(tmp_tb2)
+
+    // test insert table which not support branch
+    sql """switch internal"""
+    sql """drop database if exists iceberg_branch_insert_data_internal_db"""
+    sql """create database iceberg_branch_insert_data_internal_db"""
+    sql """use iceberg_branch_insert_data_internal_db"""
+    sql """create table iceberg_branch_insert_data_internal_tb (id int, par 
string) properties("replication_num" = "1")"""
+
+    test {
+        sql """insert into iceberg_branch_insert_data_internal_tb@branch(b1) 
values (1,'a'),(2,'a'),(3,'b'),(4,'b')"""
+        exception "Only support insert data into iceberg table's branch"
+    }
+
+    test {
+        sql """insert overwrite table 
iceberg_branch_insert_data_internal_tb@branch(b1) values 
(1,'a'),(2,'a'),(3,'b'),(4,'b')"""
+        exception "Only support insert overwrite into iceberg table's branch"
+    }
+}
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to