This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 1071eef5182 branch-4.1:[fix](iceberg) Reject COW table row-level DML
(#63950) (#63976)
1071eef5182 is described below
commit 1071eef51825b9da9c6b2a14ff9c1722ff927995
Author: daidai <[email protected]>
AuthorDate: Tue Jun 2 14:17:54 2026 +0800
branch-4.1:[fix](iceberg) Reject COW table row-level DML (#63950) (#63976)
### What problem does this PR solve?
Problem Summary:
pick #63950
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../datasource/iceberg/IcebergMetadataOps.java | 6 ++
.../trees/plans/commands/IcebergDeleteCommand.java | 5 +-
.../plans/commands/IcebergDmlCommandUtils.java | 61 +++++++++++++++
.../trees/plans/commands/IcebergMergeCommand.java | 7 +-
.../trees/plans/commands/IcebergUpdateCommand.java | 5 +-
.../datasource/iceberg/CreateIcebergTableTest.java | 42 ++++++++++
.../iceberg/IcebergDDLAndDMLPlanTest.java | 15 +++-
.../plans/commands/IcebergDmlCommandUtilsTest.java | 91 ++++++++++++++++++++++
.../dml/test_iceberg_merge_into_advanced.groovy | 50 +++++++++---
.../dml/test_iceberg_merge_into_basic.groovy | 10 ++-
.../dml/test_iceberg_update_delete_advanced.groovy | 42 ++++++++--
.../dml/test_iceberg_update_delete_basic.groovy | 10 ++-
...eberg_v3_row_lineage_update_delete_merge.groovy | 10 ++-
13 files changed, 326 insertions(+), 28 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index a82e59157ba..d447bf531e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -55,10 +55,12 @@ import com.google.common.base.Splitter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
@@ -359,6 +361,10 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
Schema schema = new
Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = createTableInfo.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION,
ExternalCatalog.DORIS_VERSION_VALUE);
+ properties.putIfAbsent(TableProperties.FORMAT_VERSION, "2");
+ properties.putIfAbsent(TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ properties.putIfAbsent(TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ properties.putIfAbsent(TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
PartitionSpec partitionSpec =
IcebergUtils.solveIcebergPartitionSpec(createTableInfo.getPartitionDesc(),
schema);
// Build and create table with optional sort order
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDeleteCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDeleteCommand.java
index ce48e03be99..370c164d193 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDeleteCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDeleteCommand.java
@@ -114,6 +114,7 @@ public class IcebergDeleteCommand extends Command
implements ForwardWithSync, Ex
}
IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+ IcebergDmlCommandUtils.checkDeleteMode(icebergTable);
// Verify table format version (must be v2+ for delete support)
// org.apache.iceberg.Table icebergTableObj =
icebergTable.getIcebergTable();
@@ -283,10 +284,12 @@ public class IcebergDeleteCommand extends Command
implements ForwardWithSync, Ex
if (!(table instanceof IcebergExternalTable)) {
throw new AnalysisException("Table must be IcebergExternalTable in
DELETE command");
}
+ IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+ IcebergDmlCommandUtils.checkDeleteMode(icebergTable);
long previousTargetTableId = ctx.getIcebergRowIdTargetTableId();
ctx.setIcebergRowIdTargetTableId(table.getId());
try {
- return completeQueryPlan(ctx, logicalQuery, (IcebergExternalTable)
table);
+ return completeQueryPlan(ctx, logicalQuery, icebergTable);
} finally {
ctx.setIcebergRowIdTargetTableId(previousTargetTableId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDmlCommandUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDmlCommandUtils.java
new file mode 100644
index 00000000000..df721b14380
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergDmlCommandUtils.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.TableProperties;
+
+import java.util.Map;
+
+/**
+ * Helpers for Iceberg row-level DML commands.
+ */
+final class IcebergDmlCommandUtils {
+ private IcebergDmlCommandUtils() {
+ }
+
+ static void checkDeleteMode(IcebergExternalTable table) {
+ checkNotCopyOnWrite(table, "DELETE", TableProperties.DELETE_MODE,
+ TableProperties.DELETE_MODE_DEFAULT);
+ }
+
+ static void checkUpdateMode(IcebergExternalTable table) {
+ checkNotCopyOnWrite(table, "UPDATE", TableProperties.UPDATE_MODE,
+ TableProperties.UPDATE_MODE_DEFAULT);
+ }
+
+ static void checkMergeMode(IcebergExternalTable table) {
+ checkNotCopyOnWrite(table, "MERGE INTO", TableProperties.MERGE_MODE,
+ TableProperties.MERGE_MODE_DEFAULT);
+ }
+
+ private static void checkNotCopyOnWrite(IcebergExternalTable table, String
operation,
+ String modeProperty, String defaultMode) {
+ Map<String, String> properties = table.getIcebergTable().properties();
+ String mode = properties.getOrDefault(modeProperty, defaultMode);
+ if
(RowLevelOperationMode.COPY_ON_WRITE.modeName().equalsIgnoreCase(mode)) {
+ throw new AnalysisException(String.format(
+ "Doris does not support %s on Iceberg copy-on-write
tables. "
+ + "Set table property '%s' to 'merge-on-read'.",
+ operation, modeProperty));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergMergeCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergMergeCommand.java
index 7770d147812..59eb6a6d6ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergMergeCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergMergeCommand.java
@@ -139,6 +139,7 @@ public class IcebergMergeCommand extends Command implements
ForwardWithSync, Exp
+ "Table " + Util.getTempTableDisplayName(table.getName())
+ " is not an Iceberg table.");
}
IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+ IcebergDmlCommandUtils.checkMergeMode(icebergTable);
long previousTargetTableId = ctx.getIcebergRowIdTargetTableId();
ctx.setIcebergRowIdTargetTableId(icebergTable.getId());
try {
@@ -156,10 +157,12 @@ public class IcebergMergeCommand extends Command
implements ForwardWithSync, Exp
throw new AnalysisException("MERGE INTO can only be used on
Iceberg tables. "
+ "Table " + Util.getTempTableDisplayName(table.getName())
+ " is not an Iceberg table.");
}
+ IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+ IcebergDmlCommandUtils.checkMergeMode(icebergTable);
long previousTargetTableId = ctx.getIcebergRowIdTargetTableId();
- ctx.setIcebergRowIdTargetTableId(((IcebergExternalTable)
table).getId());
+ ctx.setIcebergRowIdTargetTableId(icebergTable.getId());
try {
- return buildMergePlan(ctx, (IcebergExternalTable) table);
+ return buildMergePlan(ctx, icebergTable);
} finally {
ctx.setIcebergRowIdTargetTableId(previousTargetTableId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergUpdateCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergUpdateCommand.java
index 2ed7e0ed49b..8759343e055 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergUpdateCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergUpdateCommand.java
@@ -114,6 +114,7 @@ public class IcebergUpdateCommand extends Command
implements ForwardWithSync, Ex
}
IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+ IcebergDmlCommandUtils.checkUpdateMode(icebergTable);
// Verify table format version (must be v2+ for update support)
// org.apache.iceberg.Table icebergTableObj =
icebergTable.getIcebergTable();
@@ -305,10 +306,12 @@ public class IcebergUpdateCommand extends Command
implements ForwardWithSync, Ex
if (!(table instanceof IcebergExternalTable)) {
throw new AnalysisException("Table must be IcebergExternalTable in
UPDATE command");
}
+ IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+ IcebergDmlCommandUtils.checkUpdateMode(icebergTable);
long previousTargetTableId = ctx.getIcebergRowIdTargetTableId();
ctx.setIcebergRowIdTargetTableId(table.getId());
try {
- return buildMergePlan(ctx, logicalQuery, assignments,
(IcebergExternalTable) table);
+ return buildMergePlan(ctx, logicalQuery, assignments,
icebergTable);
} finally {
ctx.setIcebergRowIdTargetTableId(previousTargetTableId);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
index 3422100de0f..909359146eb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
@@ -28,9 +28,12 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Maps;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -102,6 +105,40 @@ public class CreateIcebergTableTest {
Assert.assertEquals("b", table.properties().get("a"));
}
+ @Test
+ public void testDefaultProperties() throws UserException {
+ TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+ String sql = "create table " + tb + " (id int) engine = iceberg";
+ createTable(sql);
+ Table table = ops.getCatalog().loadTable(tb);
+ Assert.assertEquals(2, getFormatVersion(table));
+ Assert.assertEquals(RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ table.properties().get(TableProperties.DELETE_MODE));
+ Assert.assertEquals(RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ table.properties().get(TableProperties.UPDATE_MODE));
+ Assert.assertEquals(RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ table.properties().get(TableProperties.MERGE_MODE));
+ }
+
+ @Test
+ public void testExplicitProperties() throws UserException {
+ TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+ String sql = "create table " + tb + " (id int) engine = iceberg
properties("
+ + "\"format-version\"=\"1\", "
+ + "\"write.delete.mode\"=\"copy-on-write\", "
+ + "\"write.update.mode\"=\"copy-on-write\", "
+ + "\"write.merge.mode\"=\"copy-on-write\")";
+ createTable(sql);
+ Table table = ops.getCatalog().loadTable(tb);
+ Assert.assertEquals(1, getFormatVersion(table));
+ Assert.assertEquals(RowLevelOperationMode.COPY_ON_WRITE.modeName(),
+ table.properties().get(TableProperties.DELETE_MODE));
+ Assert.assertEquals(RowLevelOperationMode.COPY_ON_WRITE.modeName(),
+ table.properties().get(TableProperties.UPDATE_MODE));
+ Assert.assertEquals(RowLevelOperationMode.COPY_ON_WRITE.modeName(),
+ table.properties().get(TableProperties.MERGE_MODE));
+ }
+
@Test
public void testType() throws UserException {
TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
@@ -191,6 +228,11 @@ public class CreateIcebergTableTest {
return s.replaceAll("-", "");
}
+ private int getFormatVersion(Table table) {
+ Assert.assertTrue(table instanceof BaseTable);
+ return ((BaseTable) table).operations().current().formatVersion();
+ }
+
@Test
public void testDropDB() {
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
index dd94f6daed1..143438a71bb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
@@ -60,8 +60,10 @@ import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -142,7 +144,11 @@ public class IcebergDDLAndDMLPlanTest extends
TestWithFeService {
TableIdentifier.of(dbName, tableName),
icebergSchema,
PartitionSpec.unpartitioned(),
- ImmutableMap.of("format-version", "2"));
+ ImmutableMap.of(
+ TableProperties.FORMAT_VERSION, "2",
+ TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName()));
List<Column> schema = ImmutableList.of(
new Column("id", PrimitiveType.INT),
@@ -172,7 +178,12 @@ public class IcebergDDLAndDMLPlanTest extends
TestWithFeService {
Table mockedIcebergTable = Mockito.mock(Table.class);
PartitionSpec mockedSpec = Mockito.mock(PartitionSpec.class);
Mockito.doReturn(false).when(mockedSpec).isPartitioned();
- Mockito.doReturn(ImmutableMap.of("format-version",
"2")).when(mockedIcebergTable).properties();
+ Mockito.doReturn(ImmutableMap.of(
+ TableProperties.FORMAT_VERSION, "2",
+ TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName()))
+ .when(mockedIcebergTable).properties();
Mockito.doReturn(mockedSpec).when(mockedIcebergTable).spec();
Mockito.doReturn(ImmutableMap.<Integer,
PartitionSpec>of()).when(mockedIcebergTable).specs();
Mockito.doReturn(icebergSchema).when(mockedIcebergTable).schema();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/IcebergDmlCommandUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/IcebergDmlCommandUtilsTest.java
new file mode 100644
index 00000000000..562484a7dc9
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/IcebergDmlCommandUtilsTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergDmlCommandUtilsTest {
+
+ @Test
+ public void testDefaultModesRejectCopyOnWriteOperations() {
+ IcebergExternalTable table = mockIcebergExternalTable(new HashMap<>());
+
+ assertCopyOnWriteException(() ->
IcebergDmlCommandUtils.checkDeleteMode(table),
+ "DELETE", TableProperties.DELETE_MODE);
+ assertCopyOnWriteException(() ->
IcebergDmlCommandUtils.checkUpdateMode(table),
+ "UPDATE", TableProperties.UPDATE_MODE);
+ assertCopyOnWriteException(() ->
IcebergDmlCommandUtils.checkMergeMode(table),
+ "MERGE INTO", TableProperties.MERGE_MODE);
+ }
+
+ @Test
+ public void testExplicitCopyOnWriteModeRejectsOperation() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(TableProperties.DELETE_MODE,
RowLevelOperationMode.COPY_ON_WRITE.modeName());
+ properties.put(TableProperties.UPDATE_MODE,
RowLevelOperationMode.COPY_ON_WRITE.modeName());
+ properties.put(TableProperties.MERGE_MODE,
RowLevelOperationMode.COPY_ON_WRITE.modeName());
+ IcebergExternalTable table = mockIcebergExternalTable(properties);
+
+ assertCopyOnWriteException(() ->
IcebergDmlCommandUtils.checkDeleteMode(table),
+ "DELETE", TableProperties.DELETE_MODE);
+ assertCopyOnWriteException(() ->
IcebergDmlCommandUtils.checkUpdateMode(table),
+ "UPDATE", TableProperties.UPDATE_MODE);
+ assertCopyOnWriteException(() ->
IcebergDmlCommandUtils.checkMergeMode(table),
+ "MERGE INTO", TableProperties.MERGE_MODE);
+ }
+
+ @Test
+ public void testMergeOnReadModeAllowsOperation() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ properties.put(TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ properties.put(TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ IcebergExternalTable table = mockIcebergExternalTable(properties);
+
+ Assertions.assertDoesNotThrow(() ->
IcebergDmlCommandUtils.checkDeleteMode(table));
+ Assertions.assertDoesNotThrow(() ->
IcebergDmlCommandUtils.checkUpdateMode(table));
+ Assertions.assertDoesNotThrow(() ->
IcebergDmlCommandUtils.checkMergeMode(table));
+ }
+
+ private static void assertCopyOnWriteException(Runnable action, String
operation, String property) {
+ AnalysisException exception =
Assertions.assertThrows(AnalysisException.class, action::run);
+ Assertions.assertTrue(exception.getMessage().contains(operation));
+
Assertions.assertTrue(exception.getMessage().contains("copy-on-write"));
+ Assertions.assertTrue(exception.getMessage().contains(property));
+ }
+
+ private static IcebergExternalTable mockIcebergExternalTable(Map<String,
String> properties) {
+ Table icebergTable = Mockito.mock(Table.class);
+ Mockito.when(icebergTable.properties()).thenReturn(properties);
+
+ IcebergExternalTable table = Mockito.mock(IcebergExternalTable.class);
+ Mockito.when(table.getIcebergTable()).thenReturn(icebergTable);
+ return table;
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_advanced.groovy
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_advanced.groovy
index 35e64679a2f..232b10c6666 100644
---
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_advanced.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_advanced.groovy
@@ -77,7 +77,10 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
) ENGINE=iceberg
PROPERTIES (
"format-version" = "1",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
sql """INSERT INTO ${v1TableName} VALUES (1, 'A')"""
@@ -93,6 +96,30 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
}
sql """drop table if exists ${v1TableName}"""
+ // 1. Error handling for copy-on-write tables
+ String cowTableName = "${tableName}_cow_${format}"
+ sql """drop table if exists ${cowTableName}"""
+ sql """
+ CREATE TABLE ${cowTableName} (
+ id INT,
+ name STRING
+ ) ENGINE=iceberg
+ PROPERTIES (
+ "format-version" = "2",
+ "write.format.default" = "${format}",
+ "write.merge.mode" = "copy-on-write"
+ )
+ """
+ test {
+ sql """
+ MERGE INTO ${cowTableName} t
+ USING (SELECT 1 as id, 'B' as name) s
+ ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET name = s.name
+ """
+ exception "Doris does not support MERGE INTO on Iceberg
copy-on-write tables"
+ }
+
String formatTableName = "${tableName}_${format}"
sql """drop table if exists ${formatTableName}"""
@@ -104,7 +131,10 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
) ENGINE=iceberg
PROPERTIES (
"format-version" = "2",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
@@ -115,7 +145,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
(3, 'Charlie', 35)
"""
- // 1. Matched-Only
+ // 2. Matched-Only
def q_matched_only = "qt_${format}_matched_only"
"${q_matched_only}" """
MERGE INTO ${formatTableName} t
@@ -128,7 +158,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_m1 = "order_qt_${format}_check_m1"
"${q_check_m1}" """SELECT * FROM ${formatTableName}"""
- // 2. Not-Matched-Only
+ // 3. Not-Matched-Only
def q_not_matched_only = "qt_${format}_not_matched_only"
"${q_not_matched_only}" """
MERGE INTO ${formatTableName} t
@@ -141,7 +171,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_m2 = "order_qt_${format}_check_m2"
"${q_check_m2}" """SELECT * FROM ${formatTableName}"""
- // 3. Multiple MATCHED WHEN clauses
+ // 4. Multiple MATCHED WHEN clauses
def q_multi_when = "qt_${format}_multi_when"
"${q_multi_when}" """
MERGE INTO ${formatTableName} t
@@ -157,7 +187,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_m3 = "order_qt_${format}_check_m3"
"${q_check_m3}" """SELECT * FROM ${formatTableName}"""
- // 4. Schema Evolution test with MERGE INTO
+ // 5. Schema Evolution test with MERGE INTO
sql """ALTER TABLE ${formatTableName} ADD COLUMN c_new INT"""
def q_schema_ev_merge = "qt_${format}_schema_ev_merge"
"${q_schema_ev_merge}" """
@@ -174,7 +204,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_m4 = "order_qt_${format}_check_m4"
"${q_check_m4}" """SELECT * FROM ${formatTableName}"""
- // 5. Subqueries
+ // 6. Subqueries
def q_subquery_upd = "qt_${format}_subquery_upd"
"${q_subquery_upd}" """
MERGE INTO ${formatTableName} t
@@ -192,7 +222,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_subqueries = "order_qt_${format}_check_subqueries"
"${q_check_subqueries}" """SELECT * FROM ${formatTableName}"""
- // 6. Complex Expressions
+ // 7. Complex Expressions
def q_expr_upd = "qt_${format}_expr_upd"
"${q_expr_upd}" """
MERGE INTO ${formatTableName} t
@@ -203,7 +233,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_expr = "order_qt_${format}_check_expr"
"${q_check_expr}" """SELECT * FROM ${formatTableName}"""
- // 7. Schema Evolution part 2 (Drop column)
+ // 8. Schema Evolution part 2 (Drop column)
sql """ALTER TABLE ${formatTableName} DROP COLUMN age"""
def q_schema_ev_upd2 = "qt_${format}_schema_ev_upd2"
"${q_schema_ev_upd2}" """
@@ -215,7 +245,7 @@ suite("test_iceberg_merge_into_advanced",
"p0,external,iceberg,external_docker,e
def q_check_schema2 = "order_qt_${format}_schema_ev_check2"
"${q_check_schema2}" """SELECT * FROM ${formatTableName}"""
- // 8. Concurrent Conflict Detection (Best Effort)
+ // 9. Concurrent Conflict Detection (Best Effort)
sql """INSERT INTO ${formatTableName} (id, name, c_new) VALUES (6,
'Frank', 100)"""
def future1 = thread {
try {
diff --git
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_basic.groovy
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_basic.groovy
index dbffa23e6be..c7307df3da1 100644
---
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_basic.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_merge_into_basic.groovy
@@ -62,7 +62,10 @@ suite("test_iceberg_merge_into_basic",
"p0,external,iceberg,external_docker,exte
) ENGINE=iceberg
PROPERTIES (
"format-version" = "2",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
@@ -123,7 +126,10 @@ suite("test_iceberg_merge_into_basic",
"p0,external,iceberg,external_docker,exte
PARTITION BY LIST (DAY(dt)) ()
PROPERTIES (
"format-version" = "2",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
diff --git
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_advanced.groovy
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_advanced.groovy
index 3c78f32ccf4..8a1ab045fb1 100644
---
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_advanced.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_advanced.groovy
@@ -77,7 +77,10 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
) ENGINE=iceberg
PROPERTIES (
"format-version" = "1",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
sql """INSERT INTO ${v1TableName} VALUES (1, 'A')"""
@@ -92,6 +95,30 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
}
sql """drop table if exists ${v1TableName}"""
+ // 2. Error handling for copy-on-write tables
+ String cowTableName = "${tableName}_cow_${format}"
+ sql """drop table if exists ${cowTableName}"""
+ sql """
+ CREATE TABLE ${cowTableName} (
+ id INT,
+ name STRING
+ ) ENGINE=iceberg
+ PROPERTIES (
+ "format-version" = "2",
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "copy-on-write",
+ "write.update.mode" = "copy-on-write"
+ )
+ """
+ test {
+ sql """DELETE FROM ${cowTableName} WHERE id = 1"""
+ exception "Doris does not support DELETE on Iceberg copy-on-write
tables"
+ }
+ test {
+ sql """UPDATE ${cowTableName} SET name = 'B' WHERE id = 1"""
+ exception "Doris does not support UPDATE on Iceberg copy-on-write
tables"
+ }
+
// Main table for advanced operations
String formatTableName = "${tableName}_${format}"
sql """drop table if exists ${formatTableName}"""
@@ -103,7 +130,10 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
) ENGINE=iceberg
PROPERTIES (
"format-version" = "2",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
@@ -116,7 +146,7 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
(5, 'Eve', 45)
"""
- // 2. Subqueries
+ // 3. Subqueries
def q_subquery_upd = "qt_${format}_subquery_upd"
"${q_subquery_upd}" """
UPDATE ${formatTableName}
@@ -133,7 +163,7 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
def q_check_subqueries = "order_qt_${format}_check_subqueries"
"${q_check_subqueries}" """SELECT * FROM ${formatTableName}"""
- // 3. Complex Expressions
+ // 4. Complex Expressions
def q_expr_upd = "qt_${format}_expr_upd"
"${q_expr_upd}" """
UPDATE ${formatTableName}
@@ -143,7 +173,7 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
def q_check_expr = "order_qt_${format}_check_expr"
"${q_check_expr}" """SELECT * FROM ${formatTableName}"""
- // 4. Schema Evolution
+ // 5. Schema Evolution
sql """ALTER TABLE ${formatTableName} ADD COLUMN c_new INT"""
sql """INSERT INTO ${formatTableName} VALUES (6, 'Frank', 50, 100)"""
@@ -163,7 +193,7 @@ suite("test_iceberg_update_delete_advanced",
"p0,external,iceberg,external_docke
def q_schema_ev_check2 = "order_qt_${format}_schema_ev_check2"
"${q_schema_ev_check2}" """SELECT * FROM ${formatTableName}"""
- // 5. Concurrent Conflict Detection (Best Effort)
+ // 6. Concurrent Conflict Detection (Best Effort)
// We will launch two concurrent updates. We just expect the backend
to not crash.
// It might succeed fully if optimistic concurrency is robust, or
throw transaction conflict.
def future1 = thread {
diff --git
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_basic.groovy
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_basic.groovy
index a442eb8ecc2..21384862153 100644
---
a/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_basic.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/dml/test_iceberg_update_delete_basic.groovy
@@ -62,7 +62,10 @@ suite("test_iceberg_update_delete_basic",
"p0,external,iceberg,external_docker,e
) ENGINE=iceberg
PROPERTIES (
"format-version" = "2",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
@@ -110,7 +113,10 @@ suite("test_iceberg_update_delete_basic",
"p0,external,iceberg,external_docker,e
PARTITION BY LIST (DAY(dt)) ()
PROPERTIES (
"format-version" = "2",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
index 4bce7387f86..6e11b651b07 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
@@ -147,7 +147,10 @@ suite("test_iceberg_v3_row_lineage_update_delete_merge",
"p0,external,iceberg,ex
) engine=iceberg
properties (
"format-version" = "3",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
@@ -212,7 +215,10 @@ suite("test_iceberg_v3_row_lineage_update_delete_merge",
"p0,external,iceberg,ex
partition by list (day(dt)) ()
properties (
"format-version" = "3",
- "write.format.default" = "${format}"
+ "write.format.default" = "${format}",
+ "write.delete.mode" = "merge-on-read",
+ "write.update.mode" = "merge-on-read",
+ "write.merge.mode" = "merge-on-read"
)
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]