This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 49a51e1dd6 [iceberg] Avoid partition evolution when partition fieldId
is not 0 (#7186)
49a51e1dd6 is described below
commit 49a51e1dd6390933c1f2c027f093332f14574a3e
Author: Nikolay Volik <[email protected]>
AuthorDate: Mon Mar 2 03:37:24 2026 +0100
[iceberg] Avoid partition evolution when partition fieldId is not 0 (#7186)
---
docs/content/iceberg/rest-catalog.md | 12 ++-
.../iceberg/IcebergRestMetadataCommitter.java | 63 +++++++++++---
.../iceberg/IcebergRestMetadataCommitterTest.java | 96 ++++++++++++++++++++++
3 files changed, 158 insertions(+), 13 deletions(-)
diff --git a/docs/content/iceberg/rest-catalog.md
b/docs/content/iceberg/rest-catalog.md
index 62dbc3b007..8acdd216ba 100644
--- a/docs/content/iceberg/rest-catalog.md
+++ b/docs/content/iceberg/rest-catalog.md
@@ -100,6 +100,16 @@ the query results:
200, 20, 2
*/
```
+
+**Schema compatabilty and Partition evolution:**
+
+There is a fundamental difference between Paimon and Iceberg regarding the
starting fieldId. Paimon uses fieldId 0, while Iceberg uses fieldId 1. If we
create an Iceberg table using a Paimon schema directly, it will shift all
fieldIds by +1, causing field disorder. However, it is possible to update the
schema after table creation and start the schema from fieldId 0.
+
+Table creation attempts to minimize issues with fieldId disorder and partition
evolution by following a 2 option logic:
+
+- Partition fieldId = 0: Paimon creates an empty schema first and then updates
the schema to the actual one. Partition evolution is unavoidable.
+- Partition fieldId > 0: Paimon creates an initial dummy schema first,
offsetting partition fields correctly, and then updates the schema to the
actual one, avoiding partition evolution.
+
**Note:**
Paimon will firstly write iceberg metadata in a separate directory like
hadoop-catalog, and then commit metadata to iceberg rest catalog.
@@ -108,5 +118,5 @@ If the two are incompatible, we take the metadata stored in
the separate directo
There are some cases when committing to iceberg rest catalog:
1. table not exists in iceberg rest-catalog. It'll create the table in rest
catalog first, and commit metadata.
2. table exists in iceberg rest-catalog and is compatible with the base
metadata stored in the separate directory. It'll directly get the table and
commit metadata.
-3. table exists, and isn't compatible with the base metadata stored in the
separate directory. It'll **drop the table and recreate the table**, then
commit metadata.
+3. table exists, and isn't compatible with the base metadata stored in the
separate directory. It'll **drop the table and recreate the table**, then
commit metadata.
diff --git
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
index 119ddcd743..bb358512fe 100644
---
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
+++
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
@@ -41,6 +43,8 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,7 +145,7 @@ public class IcebergRestMetadataCommitter implements
IcebergMetadataCommitter {
try {
if (!tableExists()) {
LOG.info("Table {} does not exist, create it.",
icebergTableIdentifier);
- icebergTable = createTable();
+ icebergTable = createTable(newMetadata);
updatdeBuilder =
updatesForCorrectBase(
((BaseTable)
icebergTable).operations().current(),
@@ -240,7 +244,7 @@ public class IcebergRestMetadataCommitter implements
IcebergMetadataCommitter {
private TableMetadata.Builder updatesForIncorrectBase(TableMetadata
newMetadata) {
LOG.info("the base metadata is incorrect, we'll recreate the iceberg
table.");
- icebergTable = recreateTable();
+ icebergTable = recreateTable(newMetadata);
return updatesForCorrectBase(
((BaseTable) icebergTable).operations().current(),
newMetadata, true);
}
@@ -267,15 +271,50 @@ public class IcebergRestMetadataCommitter implements
IcebergMetadataCommitter {
restCatalog.createNamespace(Namespace.of(icebergDatabaseName));
}
- private Table createTable() {
- /* Here we create iceberg table with an emptySchema. This is because:
- When creating table, fieldId in iceberg will be forced to start from
1, while fieldId in paimon usually start from 0.
- If we directly use the schema extracted from paimon to create iceberg
table, the fieldId will be in disorder, and this
- may cause incorrectness when reading by iceberg reader. So we use an
emptySchema here, and add the corresponding
- schemas later.
+ private Table createTable(TableMetadata newMetadata) {
+ /*
+ Handles fieldId incompatibility between Paimon (starts at 0) and
Iceberg (starts at 1).
+
+ Direct schema conversion shifts all fieldIds by +1, causing field
disorder. While
+ schemas can be updated post-creation to start at fieldId 0, creating
an empty schema
+ first triggers partition evolution issues that break some query
engines.
+
+ Strategy based on partition field position:
+ - fieldId = 0: Creates empty schema first, partition evolution
unavoidable
+ - fieldId > 0: Creates dummy schema with offset fields and gap filling
to preserve the partition spec
*/
- Schema emptySchema = new Schema();
- return restCatalog.createTable(icebergTableIdentifier, emptySchema);
+ PartitionSpec spec = newMetadata.spec();
+ boolean isPartitionedWithZeroFieldId =
+ spec.fields().stream().anyMatch(f -> f.sourceId() == 0);
+ if (spec.isUnpartitioned() || isPartitionedWithZeroFieldId) {
+ if (isPartitionedWithZeroFieldId) {
+ LOG.info(
+ "Partition fieldId = 0. The Iceberg REST committer
will use partition evolution to support Iceberg compatibility with the Paimon
schema. If you want to avoid this, use a non-zero fieldId partition field");
+ }
+ Schema emptySchema = new Schema();
+ return restCatalog.createTable(icebergTableIdentifier,
emptySchema);
+ } else {
+ LOG.info(
+ "Partition fieldId > 0. In order to avoid partition
evlolution, dummy schema will be created first");
+
+ int size =
+
spec.fields().stream().mapToInt(PartitionField::sourceId).max().orElseThrow();
+ // prefill the schema with dummy fields
+ NestedField[] columns = new NestedField[size];
+ for (int idx = 0; idx < size; idx++) {
+ int fieldId = idx + 1;
+ columns[idx] =
+ NestedField.optional(fieldId, "f" + fieldId,
Types.BooleanType.get());
+ }
+ // find and set partition fields with offset -1, so they align
correctly after table
+ // creation
+ for (PartitionField f : spec.fields()) {
+ columns[f.sourceId() - 1] =
newMetadata.schema().findField(f.sourceId());
+ }
+
+ Schema dummySchema = new Schema(columns);
+ return restCatalog.createTable(icebergTableIdentifier,
dummySchema, spec);
+ }
}
private Table getTable() {
@@ -287,10 +326,10 @@ public class IcebergRestMetadataCommitter implements
IcebergMetadataCommitter {
restCatalog.dropTable(icebergTableIdentifier, false);
}
- private Table recreateTable() {
+ private Table recreateTable(TableMetadata newMetadata) {
try {
dropTable();
- return createTable();
+ return createTable(newMetadata);
} catch (Exception e) {
throw new RuntimeException("Fail to recreate iceberg table.", e);
}
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
index e52ac6e06b..7111726f36 100644
---
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
@@ -43,6 +43,8 @@ import org.apache.paimon.types.RowType;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
@@ -53,6 +55,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogServer;
import org.apache.iceberg.rest.RESTServerExtension;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -139,6 +143,9 @@ public class IcebergRestMetadataCommitterTest {
testRecords,
expected,
Record::toString);
+
+ PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new
Schema()).build();
+ runPartitionSpecCompatibilityTest(expectedPartitionSpec);
}
@Test
@@ -206,6 +213,89 @@ public class IcebergRestMetadataCommitterTest {
testRecords,
expected,
Record::toString);
+
+ PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new
Schema()).build();
+ runPartitionSpecCompatibilityTest(expectedPartitionSpec);
+ }
+
+ @Test
+ public void testPartitionedPrimaryKeyTableWithNonZeroFieldId() throws
Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {
+ "k", "pt1", "pt2", "v1", "v2"
+ }); // partition starts from fieldId 1
+
+ BiFunction<Integer, String, BinaryRow> binaryRow =
+ (pt1, pt2) -> {
+ BinaryRow b = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, pt1);
+ writer.writeString(1, BinaryString.fromString(pt2));
+ writer.complete();
+ return b;
+ };
+
+ int numRounds = 20;
+ int numRecords = 500;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ boolean samePartitionEachRound = random.nextBoolean();
+
+ List<List<TestRecord>> testRecords = new ArrayList<>();
+ List<List<String>> expected = new ArrayList<>();
+ Map<String, String> expectedMap = new LinkedHashMap<>();
+ for (int r = 0; r < numRounds; r++) {
+ List<TestRecord> round = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) +
r) % 3;
+ String pt2 = String.valueOf(random.nextInt(10, 12));
+ String k = String.valueOf(random.nextInt(0, 100));
+ int v1 = random.nextInt();
+ long v2 = random.nextLong();
+ round.add(
+ new TestRecord(
+ binaryRow.apply(pt1, pt2),
+ GenericRow.of(
+ BinaryString.fromString(k),
+ pt1,
+ BinaryString.fromString(pt2),
+ v1,
+ v2)));
+ expectedMap.put(
+ String.format("%s, %d, %s", k, pt1, pt2),
String.format("%d, %d", v1, v2));
+ }
+ testRecords.add(round);
+ expected.add(
+ expectedMap.entrySet().stream()
+ .map(e -> String.format("Record(%s, %s)",
e.getKey(), e.getValue()))
+ .sorted()
+ .collect(Collectors.toList()));
+ }
+
+ runCompatibilityTest(
+ rowType,
+ Arrays.asList("pt1", "pt2"),
+ Arrays.asList("k", "pt1", "pt2"),
+ testRecords,
+ expected,
+ Record::toString);
+
+ PartitionSpec expectedPartitionSpec =
+ PartitionSpec.builderFor(
+ new Schema(
+ NestedField.required(1, "pt1",
Types.IntegerType.get()),
+ NestedField.required(2, "pt2",
Types.StringType.get())))
+ .identity("pt1")
+ .identity("pt2")
+ .build();
+ runPartitionSpecCompatibilityTest(expectedPartitionSpec);
}
private void runCompatibilityTest(
@@ -256,6 +346,12 @@ public class IcebergRestMetadataCommitterTest {
commit.close();
}
+ private void runPartitionSpecCompatibilityTest(PartitionSpec expectedSpec)
{
+ Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb",
"t"));
+ PartitionSpec spec = icebergTable.spec();
+ assertThat(spec).isEqualTo(expectedSpec);
+ }
+
@Test
public void testSchemaAndPropertiesChange() throws Exception {
RowType rowType =