This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a51e1dd6 [iceberg] Avoid partition evolution when partition fieldId 
is not 0 (#7186)
49a51e1dd6 is described below

commit 49a51e1dd6390933c1f2c027f093332f14574a3e
Author: Nikolay Volik <[email protected]>
AuthorDate: Mon Mar 2 03:37:24 2026 +0100

    [iceberg] Avoid partition evolution when partition fieldId is not 0 (#7186)
---
 docs/content/iceberg/rest-catalog.md               | 12 ++-
 .../iceberg/IcebergRestMetadataCommitter.java      | 63 +++++++++++---
 .../iceberg/IcebergRestMetadataCommitterTest.java  | 96 ++++++++++++++++++++++
 3 files changed, 158 insertions(+), 13 deletions(-)

diff --git a/docs/content/iceberg/rest-catalog.md 
b/docs/content/iceberg/rest-catalog.md
index 62dbc3b007..8acdd216ba 100644
--- a/docs/content/iceberg/rest-catalog.md
+++ b/docs/content/iceberg/rest-catalog.md
@@ -100,6 +100,16 @@ the query results:
 200, 20, 2
 */
 ```
+
+**Schema compatabilty and Partition evolution:**
+
+There is a fundamental difference between Paimon and Iceberg regarding the 
starting fieldId. Paimon uses fieldId 0, while Iceberg uses fieldId 1. If we 
create an Iceberg table using a Paimon schema directly, it will shift all 
fieldIds by +1, causing field disorder. However, it is possible to update the 
schema after table creation and start the schema from fieldId 0.
+
+Table creation attempts to minimize issues with fieldId disorder and partition 
evolution by following a 2 option logic:
+
+- Partition fieldId = 0: Paimon creates an empty schema first and then updates 
the schema to the actual one. Partition evolution is unavoidable.
+- Partition fieldId > 0: Paimon creates an initial dummy schema first, 
offsetting partition fields correctly, and then updates the schema to the 
actual one, avoiding partition evolution.
+
 **Note:**
 
 Paimon will firstly write iceberg metadata in a separate directory like 
hadoop-catalog, and then commit metadata to iceberg rest catalog.
@@ -108,5 +118,5 @@ If the two are incompatible, we take the metadata stored in 
the separate directo
 There are some cases when committing to iceberg rest catalog:
 1. table not exists in iceberg rest-catalog. It'll create the table in rest 
catalog first, and commit metadata.
 2. table exists in iceberg rest-catalog and is compatible with the base 
metadata stored in the separate directory. It'll directly get the table and 
commit metadata. 
-3. table exists, and isn't compatible with the base metadata stored in the 
separate directory. It'll **drop the table and recreate the table**, then 
commit metadata. 
+3. table exists, and isn't compatible with the base metadata stored in the 
separate directory. It'll **drop the table and recreate the table**, then 
commit metadata.
 
diff --git 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
index 119ddcd743..bb358512fe 100644
--- 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
+++ 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
@@ -41,6 +43,8 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,7 +145,7 @@ public class IcebergRestMetadataCommitter implements 
IcebergMetadataCommitter {
         try {
             if (!tableExists()) {
                 LOG.info("Table {} does not exist, create it.", 
icebergTableIdentifier);
-                icebergTable = createTable();
+                icebergTable = createTable(newMetadata);
                 updatdeBuilder =
                         updatesForCorrectBase(
                                 ((BaseTable) 
icebergTable).operations().current(),
@@ -240,7 +244,7 @@ public class IcebergRestMetadataCommitter implements 
IcebergMetadataCommitter {
 
     private TableMetadata.Builder updatesForIncorrectBase(TableMetadata 
newMetadata) {
         LOG.info("the base metadata is incorrect, we'll recreate the iceberg 
table.");
-        icebergTable = recreateTable();
+        icebergTable = recreateTable(newMetadata);
         return updatesForCorrectBase(
                 ((BaseTable) icebergTable).operations().current(), 
newMetadata, true);
     }
@@ -267,15 +271,50 @@ public class IcebergRestMetadataCommitter implements 
IcebergMetadataCommitter {
         restCatalog.createNamespace(Namespace.of(icebergDatabaseName));
     }
 
-    private Table createTable() {
-        /* Here we create iceberg table with an emptySchema. This is because:
-        When creating table, fieldId in iceberg will be forced to start from 
1, while fieldId in paimon usually start from 0.
-        If we directly use the schema extracted from paimon to create iceberg 
table, the fieldId will be in disorder, and this
-        may cause incorrectness when reading by iceberg reader. So we use an 
emptySchema here, and add the corresponding
-        schemas later.
+    private Table createTable(TableMetadata newMetadata) {
+        /*
+        Handles fieldId incompatibility between Paimon (starts at 0) and 
Iceberg (starts at 1).
+
+        Direct schema conversion shifts all fieldIds by +1, causing field 
disorder. While
+        schemas can be updated post-creation to start at fieldId 0, creating 
an empty schema
+        first triggers partition evolution issues that break some query 
engines.
+
+        Strategy based on partition field position:
+        - fieldId = 0: Creates empty schema first, partition evolution 
unavoidable
+        - fieldId > 0: Creates dummy schema with offset fields and gap filling 
to preserve the partition spec
         */
-        Schema emptySchema = new Schema();
-        return restCatalog.createTable(icebergTableIdentifier, emptySchema);
+        PartitionSpec spec = newMetadata.spec();
+        boolean isPartitionedWithZeroFieldId =
+                spec.fields().stream().anyMatch(f -> f.sourceId() == 0);
+        if (spec.isUnpartitioned() || isPartitionedWithZeroFieldId) {
+            if (isPartitionedWithZeroFieldId) {
+                LOG.info(
+                        "Partition fieldId = 0. The Iceberg REST committer 
will use partition evolution to support Iceberg compatibility with the Paimon 
schema. If you want to avoid this, use a non-zero fieldId partition field");
+            }
+            Schema emptySchema = new Schema();
+            return restCatalog.createTable(icebergTableIdentifier, 
emptySchema);
+        } else {
+            LOG.info(
+                    "Partition fieldId > 0. In order to avoid partition 
evlolution, dummy schema will be created first");
+
+            int size =
+                    
spec.fields().stream().mapToInt(PartitionField::sourceId).max().orElseThrow();
+            // prefill the schema with dummy fields
+            NestedField[] columns = new NestedField[size];
+            for (int idx = 0; idx < size; idx++) {
+                int fieldId = idx + 1;
+                columns[idx] =
+                        NestedField.optional(fieldId, "f" + fieldId, 
Types.BooleanType.get());
+            }
+            // find and set partition fields with offset -1, so they align 
correctly after table
+            // creation
+            for (PartitionField f : spec.fields()) {
+                columns[f.sourceId() - 1] = 
newMetadata.schema().findField(f.sourceId());
+            }
+
+            Schema dummySchema = new Schema(columns);
+            return restCatalog.createTable(icebergTableIdentifier, 
dummySchema, spec);
+        }
     }
 
     private Table getTable() {
@@ -287,10 +326,10 @@ public class IcebergRestMetadataCommitter implements 
IcebergMetadataCommitter {
         restCatalog.dropTable(icebergTableIdentifier, false);
     }
 
-    private Table recreateTable() {
+    private Table recreateTable(TableMetadata newMetadata) {
         try {
             dropTable();
-            return createTable();
+            return createTable(newMetadata);
         } catch (Exception e) {
             throw new RuntimeException("Fail to recreate iceberg table.", e);
         }
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
index e52ac6e06b..7111726f36 100644
--- 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
@@ -43,6 +43,8 @@ import org.apache.paimon.types.RowType;
 
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
@@ -53,6 +55,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.RESTCatalogServer;
 import org.apache.iceberg.rest.RESTServerExtension;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -139,6 +143,9 @@ public class IcebergRestMetadataCommitterTest {
                 testRecords,
                 expected,
                 Record::toString);
+
+        PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new 
Schema()).build();
+        runPartitionSpecCompatibilityTest(expectedPartitionSpec);
     }
 
     @Test
@@ -206,6 +213,89 @@ public class IcebergRestMetadataCommitterTest {
                 testRecords,
                 expected,
                 Record::toString);
+
+        PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new 
Schema()).build();
+        runPartitionSpecCompatibilityTest(expectedPartitionSpec);
+    }
+
+    @Test
+    public void testPartitionedPrimaryKeyTableWithNonZeroFieldId() throws 
Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {
+                            "k", "pt1", "pt2", "v1", "v2"
+                        }); // partition starts from fieldId 1
+
+        BiFunction<Integer, String, BinaryRow> binaryRow =
+                (pt1, pt2) -> {
+                    BinaryRow b = new BinaryRow(2);
+                    BinaryRowWriter writer = new BinaryRowWriter(b);
+                    writer.writeInt(0, pt1);
+                    writer.writeString(1, BinaryString.fromString(pt2));
+                    writer.complete();
+                    return b;
+                };
+
+        int numRounds = 20;
+        int numRecords = 500;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean samePartitionEachRound = random.nextBoolean();
+
+        List<List<TestRecord>> testRecords = new ArrayList<>();
+        List<List<String>> expected = new ArrayList<>();
+        Map<String, String> expectedMap = new LinkedHashMap<>();
+        for (int r = 0; r < numRounds; r++) {
+            List<TestRecord> round = new ArrayList<>();
+            for (int i = 0; i < numRecords; i++) {
+                int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + 
r) % 3;
+                String pt2 = String.valueOf(random.nextInt(10, 12));
+                String k = String.valueOf(random.nextInt(0, 100));
+                int v1 = random.nextInt();
+                long v2 = random.nextLong();
+                round.add(
+                        new TestRecord(
+                                binaryRow.apply(pt1, pt2),
+                                GenericRow.of(
+                                        BinaryString.fromString(k),
+                                        pt1,
+                                        BinaryString.fromString(pt2),
+                                        v1,
+                                        v2)));
+                expectedMap.put(
+                        String.format("%s, %d, %s", k, pt1, pt2), 
String.format("%d, %d", v1, v2));
+            }
+            testRecords.add(round);
+            expected.add(
+                    expectedMap.entrySet().stream()
+                            .map(e -> String.format("Record(%s, %s)", 
e.getKey(), e.getValue()))
+                            .sorted()
+                            .collect(Collectors.toList()));
+        }
+
+        runCompatibilityTest(
+                rowType,
+                Arrays.asList("pt1", "pt2"),
+                Arrays.asList("k", "pt1", "pt2"),
+                testRecords,
+                expected,
+                Record::toString);
+
+        PartitionSpec expectedPartitionSpec =
+                PartitionSpec.builderFor(
+                                new Schema(
+                                        NestedField.required(1, "pt1", 
Types.IntegerType.get()),
+                                        NestedField.required(2, "pt2", 
Types.StringType.get())))
+                        .identity("pt1")
+                        .identity("pt2")
+                        .build();
+        runPartitionSpecCompatibilityTest(expectedPartitionSpec);
     }
 
     private void runCompatibilityTest(
@@ -256,6 +346,12 @@ public class IcebergRestMetadataCommitterTest {
         commit.close();
     }
 
+    private void runPartitionSpecCompatibilityTest(PartitionSpec expectedSpec) 
{
+        Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", 
"t"));
+        PartitionSpec spec = icebergTable.spec();
+        assertThat(spec).isEqualTo(expectedSpec);
+    }
+
     @Test
     public void testSchemaAndPropertiesChange() throws Exception {
         RowType rowType =

Reply via email to