This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 25f291673f1 [fix](fe) Reject Iceberg v3 row lineage columns (#63825)
25f291673f1 is described below
commit 25f291673f1e8e78e42677a7c6494b18afa1620a
Author: Gabriel <[email protected]>
AuthorDate: Mon Jun 29 10:33:26 2026 +0800
[fix](fe) Reject Iceberg v3 row lineage columns (#63825)
Problem Summary: Creating an Iceberg table with format-version 3 could
accept user columns named _row_id or _last_updated_sequence_number.
Doris later appends Iceberg row lineage hidden columns with the same
names, which can produce duplicate hidden-column metadata and confusing
insert/query behavior. This change validates Iceberg create table
definitions, including CTAS, and rejects those reserved row lineage
column names when the requested Iceberg format version is 3 or newer.
---
.../datasource/iceberg/IcebergMetadataOps.java | 8 ++-
.../doris/datasource/iceberg/IcebergUtils.java | 27 ++++++++
.../trees/plans/commands/info/CreateTableInfo.java | 36 +++++++++++
.../iceberg/IcebergDDLAndDMLPlanTest.java | 71 ++++++++++++++++++++++
.../datasource/iceberg/IcebergMetadataOpTest.java | 50 ++++++++++++++-
5 files changed, 190 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index bb2d1debd8b..246ad347242 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -373,10 +373,16 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
Schema schema = new
Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = createTableInfo.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION,
ExternalCatalog.DORIS_VERSION_VALUE);
- properties.putIfAbsent(TableProperties.FORMAT_VERSION, "2");
+ Map<String, String> catalogProperties = dorisCatalog.getProperties();
+ if (!properties.containsKey(TableProperties.FORMAT_VERSION)
+ &&
!IcebergUtils.hasIcebergCatalogFormatVersion(catalogProperties)) {
+ properties.put(TableProperties.FORMAT_VERSION, "2");
+ }
properties.putIfAbsent(TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
properties.putIfAbsent(TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
properties.putIfAbsent(TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ createTableInfo.validateIcebergRowLineageColumns(
+ IcebergUtils.getEffectiveIcebergFormatVersion(properties,
catalogProperties));
PartitionSpec partitionSpec =
IcebergUtils.solveIcebergPartitionSpec(createTableInfo.getPartitionDesc(),
schema);
// Build and create table with optional sort order
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index ea36d755cfd..92c6171b669 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -189,6 +189,33 @@ public class IcebergUtils {
private static final Pattern SNAPSHOT_ID = Pattern.compile("\\d+");
+ public static boolean hasIcebergCatalogFormatVersion(Map<String, String>
catalogProperties) {
+ return
catalogProperties.containsKey(CatalogProperties.TABLE_OVERRIDE_PREFIX +
TableProperties.FORMAT_VERSION)
+ ||
catalogProperties.containsKey(CatalogProperties.TABLE_DEFAULT_PREFIX
+ + TableProperties.FORMAT_VERSION);
+ }
+
+ public static int getEffectiveIcebergFormatVersion(Map<String, String>
tableProperties,
+ Map<String, String> catalogProperties) {
+ String formatVersion =
catalogProperties.get(CatalogProperties.TABLE_OVERRIDE_PREFIX
+ + TableProperties.FORMAT_VERSION);
+ if (formatVersion == null) {
+ formatVersion =
tableProperties.get(TableProperties.FORMAT_VERSION);
+ if (formatVersion == null) {
+ formatVersion =
catalogProperties.get(CatalogProperties.TABLE_DEFAULT_PREFIX
+ + TableProperties.FORMAT_VERSION);
+ }
+ }
+ if (formatVersion == null) {
+ return 2;
+ }
+ try {
+ return Integer.parseInt(formatVersion);
+ } catch (NumberFormatException ignored) {
+ return 2;
+ }
+ }
+
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 14869e7925c..49e681cde11 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -49,6 +49,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -380,6 +381,9 @@ public class CreateTableInfo {
+ " Make sure 'engine' type is specified when use the
catalog: " + ctlName);
}
}
+ if (Strings.isNullOrEmpty(ctlName)) {
+ return;
+ }
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
if (catalog instanceof HMSExternalCatalog &&
!engineName.equals(ENGINE_HIVE)) {
throw new AnalysisException("Hms type catalog can only use `hive`
engine.");
@@ -791,6 +795,10 @@ public class CreateTableInfo {
+ "and you can use 'bucket(num, column)' in
'PARTITIONED BY'.");
}
+ if (engineName.equalsIgnoreCase(ENGINE_ICEBERG)) {
+ validateIcebergRowLineageColumns();
+ }
+
// Validate Iceberg sort order columns
if (sortOrderFields != null && !sortOrderFields.isEmpty()) {
if (!engineName.equalsIgnoreCase(ENGINE_ICEBERG)) {
@@ -1105,6 +1113,34 @@ public class CreateTableInfo {
}
}
+ /**
+ * Validate that Iceberg v3 tables do not define row lineage reserved
columns.
+ */
+ public void validateIcebergRowLineageColumns(int formatVersion) {
+ if (formatVersion < IcebergUtils.ICEBERG_ROW_LINEAGE_MIN_VERSION) {
+ return;
+ }
+ for (ColumnDefinition columnDef : columns) {
+ if (IcebergUtils.isIcebergRowLineageColumn(columnDef.getName())) {
+ throw new AnalysisException("Cannot create Iceberg v" +
formatVersion
+ + " table with reserved row lineage column: " +
columnDef.getName());
+ }
+ }
+ }
+
+ private void validateIcebergRowLineageColumns() {
+ validateIcebergRowLineageColumns(getEffectiveIcebergFormatVersion());
+ }
+
+ private int getEffectiveIcebergFormatVersion() {
+ CatalogIf catalog = Strings.isNullOrEmpty(ctlName) ? null
+ : Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
+ if (catalog instanceof IcebergExternalCatalog) {
+ return IcebergUtils.getEffectiveIcebergFormatVersion(properties,
catalog.getProperties());
+ }
+ return IcebergUtils.getEffectiveIcebergFormatVersion(properties,
Collections.emptyMap());
+ }
+
/**
* analyzeEngine
*/
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
index 143438a71bb..dc344b08e13 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
@@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@@ -273,6 +274,76 @@ public class IcebergDDLAndDMLPlanTest extends
TestWithFeService {
assertContainsPhysicalSink(physicalPlan,
PhysicalIcebergDeleteSink.class);
}
+ @Test
+ public void testCreateIcebergV3TableRejectsRowLineageReservedColumn()
throws Exception {
+ useIceberg();
+ String rowIdTable = "row_lineage_reserved_" +
UUID.randomUUID().toString().replace("-", "");
+ String rowIdSql = "create table " + rowIdTable
+ + " (_row_id bigint) properties('format-version'='3')";
+ LogicalPlan rowIdPlan = parseStmt(rowIdSql);
+ Assertions.assertTrue(rowIdPlan instanceof CreateTableCommand);
+
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+ () -> ((CreateTableCommand)
rowIdPlan).getCreateTableInfo().validate(connectContext));
+
+ String lastUpdatedSequenceNumberTable =
+ "row_lineage_reserved_" +
UUID.randomUUID().toString().replace("-", "");
+ String lastUpdatedSequenceNumberSql = "create table " +
lastUpdatedSequenceNumberTable
+ + " (_last_updated_sequence_number bigint)
properties('format-version'='3')";
+ LogicalPlan lastUpdatedSequenceNumberPlan =
parseStmt(lastUpdatedSequenceNumberSql);
+ Assertions.assertTrue(lastUpdatedSequenceNumberPlan instanceof
CreateTableCommand);
+
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+ () -> ((CreateTableCommand)
lastUpdatedSequenceNumberPlan).getCreateTableInfo()
+ .validate(connectContext));
+
+ String formatV2Table = "row_lineage_reserved_" +
UUID.randomUUID().toString().replace("-", "");
+ String formatV2Sql = "create table " + formatV2Table
+ + " (_row_id bigint) properties('format-version'='2')";
+ LogicalPlan formatV2Plan = parseStmt(formatV2Sql);
+ Assertions.assertTrue(formatV2Plan instanceof CreateTableCommand);
+ Assertions.assertDoesNotThrow(
+ () -> ((CreateTableCommand)
formatV2Plan).getCreateTableInfo().validate(connectContext));
+ }
+
+ @Test
+ public void
testCreateIcebergDefaultV3TableRejectsRowLineageReservedColumn() throws
Exception {
+ useIceberg();
+ IcebergExternalCatalog catalog = (IcebergExternalCatalog)
Env.getCurrentEnv()
+ .getCatalogMgr().getCatalog(catalogName);
+
catalog.getCatalogProperty().addProperty("table-default.format-version", "3");
+ try {
+ String rowIdTable = "row_lineage_reserved_" +
UUID.randomUUID().toString().replace("-", "");
+ String rowIdSql = "create table " + rowIdTable + " (_row_id
bigint)";
+ LogicalPlan rowIdPlan = parseStmt(rowIdSql);
+ Assertions.assertTrue(rowIdPlan instanceof CreateTableCommand);
+
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+ () -> ((CreateTableCommand)
rowIdPlan).getCreateTableInfo().validate(connectContext));
+
Assertions.assertFalse(catalog.getCatalog().tableExists(TableIdentifier.of(dbName,
rowIdTable)));
+
+ String formatV2Table = "row_lineage_reserved_" +
UUID.randomUUID().toString().replace("-", "");
+ String formatV2Sql = "create table " + formatV2Table
+ + " (_row_id bigint) properties('format-version'='2')";
+ LogicalPlan formatV2Plan = parseStmt(formatV2Sql);
+ Assertions.assertTrue(formatV2Plan instanceof CreateTableCommand);
+ Assertions.assertDoesNotThrow(
+ () -> ((CreateTableCommand)
formatV2Plan).getCreateTableInfo().validate(connectContext));
+ } finally {
+
catalog.getCatalogProperty().deleteProperty("table-default.format-version");
+ }
+ }
+
+ @Test
+ public void testIcebergV3CtasRejectsRowLineageReservedColumn() throws
Exception {
+ useIceberg();
+ String ctasTable = "row_lineage_reserved_" +
UUID.randomUUID().toString().replace("-", "");
+ String ctasSql = "create table " + ctasTable
+ + " properties('format-version'='3') as select 1 as _row_id";
+ LogicalPlan ctasPlan = parseStmt(ctasSql);
+ Assertions.assertTrue(ctasPlan instanceof CreateTableCommand);
+
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+ () -> ((CreateTableCommand)
ctasPlan).validateCreateTableAsSelect(
+ connectContext, ((CreateTableCommand)
ctasPlan).getCtasQuery().get()));
+ }
+
@Test
public void testIcebergUpdatePlans() throws Exception {
useIceberg();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
index 079a0c9c312..b72a9ffba20 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
@@ -17,6 +17,8 @@
package org.apache.doris.datasource.iceberg;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalDatabase;
@@ -28,7 +30,12 @@ import org.apache.doris.filesystem.FileIterator;
import org.apache.doris.filesystem.FileSystem;
import org.apache.doris.filesystem.Location;
import org.apache.doris.fs.MemoryFileSystem;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -36,6 +43,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
@@ -129,6 +137,42 @@ public class IcebergMetadataOpTest {
Assert.assertEquals(Collections.singletonList("DORIS_HORIZON_T"),
tableNames);
}
+ @Test
+ public void testPerformCreateTableRespectsCatalogDefaultFormatVersion()
throws Exception {
+ Map<String, String> catalogProps = new HashMap<>();
+ catalogProps.put(CatalogProperties.TABLE_DEFAULT_PREFIX +
TableProperties.FORMAT_VERSION, "3");
+ IcebergExternalCatalog dorisCatalog = mockHmsCatalog(catalogProps);
+ Catalog icebergCatalog = Mockito.mock(Catalog.class,
+
Mockito.withSettings().extraInterfaces(SupportsNamespaces.class));
+ IcebergMetadataOps ops = new IcebergMetadataOps(dorisCatalog,
icebergCatalog);
+
+ ExternalDatabase<?> dorisDb = Mockito.mock(ExternalDatabase.class);
+ Mockito.when(dorisDb.getRemoteName()).thenReturn("db");
+ Mockito.when(dorisDb.getTableNullable("tbl")).thenReturn(null);
+ Mockito.doReturn(dorisDb).when(dorisCatalog).getDbNullable("db");
+ Mockito.when(dorisCatalog.getName()).thenReturn("iceberg_catalog");
+ Mockito.when(icebergCatalog.tableExists(TableIdentifier.of("db",
"tbl"))).thenReturn(false);
+
+ CreateTableInfo createTableInfo = Mockito.mock(CreateTableInfo.class);
+ Map<String, String> tableProps = new HashMap<>();
+ Mockito.when(createTableInfo.getDbName()).thenReturn("db");
+ Mockito.when(createTableInfo.getTableName()).thenReturn("tbl");
+ Mockito.when(createTableInfo.isIfNotExists()).thenReturn(false);
+
Mockito.when(createTableInfo.getColumns()).thenReturn(Collections.singletonList(
+ new Column("id", Type.INT, true)));
+ Mockito.when(createTableInfo.getProperties()).thenReturn(tableProps);
+
+ ops.performCreateTable(createTableInfo);
+
+ Mockito.verify(createTableInfo).validateIcebergRowLineageColumns(3);
+ ArgumentCaptor<Map<String, String>> propsCaptor =
ArgumentCaptor.forClass(Map.class);
+
Mockito.verify(icebergCatalog).createTable(Mockito.eq(TableIdentifier.of("db",
"tbl")),
+ Mockito.any(Schema.class), Mockito.any(PartitionSpec.class),
propsCaptor.capture());
+
Assert.assertFalse(propsCaptor.getValue().containsKey(TableProperties.FORMAT_VERSION));
+ Assert.assertEquals(3, IcebergUtils.getEffectiveIcebergFormatVersion(
+ propsCaptor.getValue(), catalogProps));
+ }
+
@Test
public void testDropTableCleansEmptyTableLocation() throws Exception {
MemoryFileSystem fs = new MemoryFileSystem();
@@ -214,10 +258,14 @@ public class IcebergMetadataOpTest {
}
private IcebergExternalCatalog mockHmsCatalog() {
+ return mockHmsCatalog(Collections.emptyMap());
+ }
+
+ private IcebergExternalCatalog mockHmsCatalog(Map<String, String>
catalogProperties) {
IcebergExternalCatalog dorisCatalog =
Mockito.mock(IcebergExternalCatalog.class);
Mockito.when(dorisCatalog.getExecutionAuthenticator()).thenReturn(new
ExecutionAuthenticator() {
});
-
Mockito.when(dorisCatalog.getProperties()).thenReturn(Collections.emptyMap());
+
Mockito.when(dorisCatalog.getProperties()).thenReturn(catalogProperties);
Mockito.when(dorisCatalog.getIcebergCatalogType()).thenReturn(IcebergExternalCatalog.ICEBERG_HMS);
Mockito.when(dorisCatalog.getCatalogProperty()).thenReturn(new
CatalogProperty(null, Collections.emptyMap()));
return dorisCatalog;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]