This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a83228410 [iceberg] Enhance iceberg snapshot metadata. (#6354)
4a83228410 is described below

commit 4a83228410e6b3fd14042516b96c462a06f3fa98
Author: Jiajia Li <[email protected]>
AuthorDate: Tue Sep 30 09:43:13 2025 +0800

    [iceberg] Enhance iceberg snapshot metadata. (#6354)
---
 .../paimon/iceberg/IcebergCommitCallback.java      |  10 +-
 .../paimon/iceberg/metadata/IcebergMetadata.java   |   2 +
 .../paimon/iceberg/metadata/IcebergSnapshot.java   |  59 ++-
 .../iceberg/IcebergRestMetadataCommitter.java      |   5 +-
 .../apache/paimon/iceberg/IcebergMetadataTest.java | 554 +++++++++++++++++++++
 .../iceberg/IcebergRestMetadataCommitterTest.java  |  65 +++
 6 files changed, 689 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index c8a3717025..e8f85c662f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -339,10 +339,13 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 new IcebergSnapshot(
                         snapshotId,
                         snapshotId,
+                        null,
                         System.currentTimeMillis(),
                         IcebergSnapshotSummary.APPEND,
                         
pathFactory.toManifestListPath(manifestListFileName).toString(),
-                        schemaId);
+                        schemaId,
+                        null,
+                        null);
 
         // Tags can only be included in Iceberg if they point to an Iceberg 
snapshot that
         // exists. Otherwise an Iceberg client fails to parse the metadata and 
all reads fail.
@@ -599,10 +602,13 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 new IcebergSnapshot(
                         snapshotId,
                         snapshotId,
+                        snapshotId - 1,
                         System.currentTimeMillis(),
                         snapshotSummary,
                         
pathFactory.toManifestListPath(manifestListFileName).toString(),
-                        schemaId));
+                        schemaId,
+                        null,
+                        null));
 
         // all snapshots in this list, except the last one, need to expire
         List<IcebergSnapshot> toExpireExceptLast = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index c755e32c7d..0aeef94247 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -319,6 +319,7 @@ public class IcebergMetadata {
                 defaultSortOrderId,
                 snapshots,
                 currentSnapshotId,
+                properties,
                 refs);
     }
 
@@ -347,6 +348,7 @@ public class IcebergMetadata {
                 && defaultSortOrderId == that.defaultSortOrderId
                 && Objects.equals(snapshots, that.snapshots)
                 && currentSnapshotId == that.currentSnapshotId
+                && Objects.equals(properties, that.properties)
                 && Objects.equals(refs, that.refs);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
index df0224d22b..d6a0dd8f21 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
@@ -21,8 +21,11 @@ package org.apache.paimon.iceberg.metadata;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 /**
@@ -35,10 +38,13 @@ public class IcebergSnapshot {
 
     private static final String FIELD_SEQUENCE_NUMBER = "sequence-number";
     private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+    private static final String FIELD_PARENT_SNAPSHOT_ID = 
"parent-snapshot-id";
     private static final String FIELD_TIMESTAMP_MS = "timestamp-ms";
     private static final String FIELD_SUMMARY = "summary";
     private static final String FIELD_MANIFEST_LIST = "manifest-list";
     private static final String FIELD_SCHEMA_ID = "schema-id";
+    private static final String FIELD_FIRST_ROW_ID = "first-row-id";
+    private static final String FIELD_ADDED_ROWS = "added-rows";
 
     @JsonProperty(FIELD_SEQUENCE_NUMBER)
     private final long sequenceNumber;
@@ -46,6 +52,11 @@ public class IcebergSnapshot {
     @JsonProperty(FIELD_SNAPSHOT_ID)
     private final long snapshotId;
 
+    @JsonProperty(FIELD_PARENT_SNAPSHOT_ID)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Long parentSnapshotId;
+
     @JsonProperty(FIELD_TIMESTAMP_MS)
     private final long timestampMs;
 
@@ -58,20 +69,36 @@ public class IcebergSnapshot {
     @JsonProperty(FIELD_SCHEMA_ID)
     private final int schemaId;
 
+    @JsonProperty(FIELD_FIRST_ROW_ID)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Long firstRowId;
+
+    @JsonProperty(FIELD_ADDED_ROWS)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Long addedRows;
+
     @JsonCreator
     public IcebergSnapshot(
             @JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber,
             @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+            @JsonProperty(FIELD_PARENT_SNAPSHOT_ID) Long parentSnapshotId,
             @JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs,
             @JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary,
             @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
-            @JsonProperty(FIELD_SCHEMA_ID) int schemaId) {
+            @JsonProperty(FIELD_SCHEMA_ID) int schemaId,
+            @JsonProperty(FIELD_FIRST_ROW_ID) Long firstRowId,
+            @JsonProperty(FIELD_ADDED_ROWS) Long addedRows) {
         this.sequenceNumber = sequenceNumber;
         this.snapshotId = snapshotId;
+        this.parentSnapshotId = parentSnapshotId;
         this.timestampMs = timestampMs;
         this.summary = summary;
         this.manifestList = manifestList;
         this.schemaId = schemaId;
+        this.firstRowId = firstRowId;
+        this.addedRows = addedRows;
     }
 
     @JsonGetter(FIELD_SEQUENCE_NUMBER)
@@ -104,10 +131,33 @@ public class IcebergSnapshot {
         return schemaId;
     }
 
+    @JsonGetter(FIELD_PARENT_SNAPSHOT_ID)
+    public Long parentSnapshotId() {
+        return parentSnapshotId;
+    }
+
+    @JsonGetter(FIELD_ADDED_ROWS)
+    public Long addedRows() {
+        return addedRows;
+    }
+
+    @JsonGetter(FIELD_FIRST_ROW_ID)
+    public Long firstRowId() {
+        return firstRowId;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(
-                sequenceNumber, snapshotId, timestampMs, summary, 
manifestList, schemaId);
+                sequenceNumber,
+                snapshotId,
+                parentSnapshotId,
+                timestampMs,
+                summary,
+                manifestList,
+                schemaId,
+                addedRows,
+                firstRowId);
     }
 
     @Override
@@ -122,9 +172,12 @@ public class IcebergSnapshot {
         IcebergSnapshot that = (IcebergSnapshot) o;
         return sequenceNumber == that.sequenceNumber
                 && snapshotId == that.snapshotId
+                && Objects.equals(parentSnapshotId, that.parentSnapshotId)
                 && timestampMs == that.timestampMs
                 && Objects.equals(summary, that.summary)
                 && Objects.equals(manifestList, that.manifestList)
-                && schemaId == that.schemaId;
+                && schemaId == that.schemaId
+                && Objects.equals(addedRows, that.addedRows)
+                && Objects.equals(firstRowId, that.firstRowId);
     }
 }
diff --git 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
index 1fa0db2441..119ddcd743 100644
--- 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
+++ 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
@@ -374,10 +374,13 @@ public class IcebergRestMetadataCommitter implements 
IcebergMetadataCommitter {
                                         new IcebergSnapshot(
                                                 snapshot.sequenceNumber(),
                                                 snapshot.snapshotId(),
+                                                snapshot.parentSnapshotId(),
                                                 snapshot.timestampMs(),
                                                 snapshot.summary(),
                                                 snapshot.manifestList(),
-                                                snapshot.schemaId() + 1))
+                                                snapshot.schemaId() + 1,
+                                                snapshot.firstRowId(),
+                                                snapshot.addedRows()))
                         .collect(Collectors.toList());
         return new IcebergMetadata(
                 newIcebergMetadata.formatVersion(),
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
new file mode 100644
index 0000000000..3fc1aad2cd
--- /dev/null
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.options.Options;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTCatalogServer;
+import org.apache.iceberg.rest.RESTServerExtension;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class IcebergMetadataTest {
+    @RegisterExtension
+    private static final RESTServerExtension REST_SERVER_EXTENSION =
+            new RESTServerExtension(
+                    Map.of(
+                            RESTCatalogServer.REST_PORT,
+                            RESTServerExtension.FREE_PORT,
+                            CatalogProperties.CLIENT_POOL_SIZE,
+                            "1",
+                            CatalogProperties.CATALOG_IMPL,
+                            HadoopCatalog.class.getName()));
+
+    protected static RESTCatalog restCatalog;
+
+    @BeforeEach
+    void setUp() {
+        restCatalog = REST_SERVER_EXTENSION.client();
+    }
+
+    @Test
+    @DisplayName("Test reading metadata from basic Iceberg table creation")
+    void testReadBasicIcebergTableMetadata() throws Exception {
+        // Create a basic Iceberg table
+        Table icebergTable = createBasicIcebergTable("basic_table");
+
+        // Read metadata using Paimon's IcebergMetadata
+        IcebergMetadata paimonIcebergMetadata = 
readIcebergMetadata(icebergTable);
+
+        // Verify basic properties
+        assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(2);
+        assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+        assertThat(paimonIcebergMetadata.location()).isNotNull();
+        assertThat(paimonIcebergMetadata.currentSchemaId()).isEqualTo(0);
+        assertThat(paimonIcebergMetadata.defaultSpecId()).isEqualTo(0);
+        assertThat(paimonIcebergMetadata.defaultSortOrderId()).isEqualTo(0);
+
+        // Verify schema
+        assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+        IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+        assertThat(schema.fields()).hasSize(3);
+
+        // Verify field details
+        assertThat(schema.fields().get(0).name()).isEqualTo("id");
+        assertThat(schema.fields().get(0).type()).isEqualTo("int");
+        assertThat(schema.fields().get(0).required()).isTrue();
+
+        assertThat(schema.fields().get(1).name()).isEqualTo("name");
+        assertThat(schema.fields().get(1).type()).isEqualTo("string");
+        assertThat(schema.fields().get(1).required()).isTrue();
+
+        assertThat(schema.fields().get(2).name()).isEqualTo("age");
+        assertThat(schema.fields().get(2).type()).isEqualTo("int");
+        assertThat(schema.fields().get(2).required()).isFalse();
+
+        // Verify partition specs (should be unpartitioned)
+        assertThat(paimonIcebergMetadata.partitionSpecs()).hasSize(1);
+        
assertThat(paimonIcebergMetadata.partitionSpecs().get(0).fields()).isEmpty();
+
+        // Verify snapshots (should be empty initially)
+        assertThat(paimonIcebergMetadata.snapshots()).isEmpty();
+        assertThat(paimonIcebergMetadata.currentSnapshotId()).isEqualTo(-1);
+    }
+
+    @Test
+    @DisplayName("Test reading metadata from partitioned Iceberg table")
+    void testReadPartitionedIcebergTableMetadata() throws Exception {
+        // Create a partitioned Iceberg table
+        Table icebergTable = 
createPartitionedIcebergTable("partitioned_table");
+
+        // Read metadata using Paimon's IcebergMetadata
+        IcebergMetadata paimonIcebergMetadata = 
readIcebergMetadata(icebergTable);
+
+        // Verify basic properties
+        assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(2);
+        assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+        assertThat(paimonIcebergMetadata.location()).isNotNull();
+
+        // Verify schema
+        assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+        IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+        assertThat(schema.fields()).hasSize(4);
+
+        // Verify field details
+        assertThat(schema.fields().get(0).name()).isEqualTo("id");
+        assertThat(schema.fields().get(1).name()).isEqualTo("name");
+        assertThat(schema.fields().get(2).name()).isEqualTo("department");
+        assertThat(schema.fields().get(3).name()).isEqualTo("salary");
+
+        // Verify partition specs
+        assertThat(paimonIcebergMetadata.partitionSpecs()).hasSize(1);
+        IcebergPartitionSpec partitionSpec = 
paimonIcebergMetadata.partitionSpecs().get(0);
+        assertThat(partitionSpec.fields()).hasSize(1);
+        
assertThat(partitionSpec.fields().get(0).name()).isEqualTo("department");
+        
assertThat(partitionSpec.fields().get(0).transform()).isEqualTo("identity");
+
+        // Verify snapshots (should be empty initially)
+        assertThat(paimonIcebergMetadata.snapshots()).isEmpty();
+        assertThat(paimonIcebergMetadata.currentSnapshotId()).isEqualTo(-1);
+    }
+
+    @Test
+    @DisplayName("Test reading metadata from sorted Iceberg table")
+    void testReadSortedIcebergTableMetadata() throws Exception {
+        // Create a sorted Iceberg table
+        Table icebergTable = createSortedIcebergTable("sorted_table");
+
+        // Read metadata using Paimon's IcebergMetadata
+        IcebergMetadata paimonIcebergMetadata = 
readIcebergMetadata(icebergTable);
+
+        // Verify basic properties
+        assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(2);
+        assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+        assertThat(paimonIcebergMetadata.location()).isNotNull();
+
+        // Verify schema
+        assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+        IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+        assertThat(schema.fields()).hasSize(3);
+
+        // Verify field details
+        assertThat(schema.fields().get(0).name()).isEqualTo("id");
+        assertThat(schema.fields().get(1).name()).isEqualTo("name");
+        assertThat(schema.fields().get(2).name()).isEqualTo("score");
+
+        // Verify sort orders
+        assertThat(paimonIcebergMetadata.sortOrders()).hasSize(1);
+        assertThat(paimonIcebergMetadata.defaultSortOrderId()).isEqualTo(1);
+
+        // Verify snapshots (should be empty initially)
+        assertThat(paimonIcebergMetadata.snapshots()).isEmpty();
+        assertThat(paimonIcebergMetadata.currentSnapshotId()).isEqualTo(-1);
+    }
+
+    @Test
+    @DisplayName("Test reading metadata after Iceberg table operations")
+    void testReadMetadataAfterIcebergOperations() throws Exception {
+        // Create a basic Iceberg table
+        Table icebergTable = createBasicIcebergTable("operations_table");
+
+        // Perform first append operation
+        icebergTable
+                .newFastAppend()
+                .appendFile(
+                        DataFiles.builder(PartitionSpec.unpartitioned())
+                                .withPath("/path/to/data-a.parquet")
+                                .withFileSizeInBytes(100)
+                                .withRecordCount(10)
+                                .build())
+                .commit();
+
+        // Read metadata after first operation
+        IcebergMetadata paimonIcebergMetadata1 = 
readIcebergMetadata("operations_table");
+
+        // Verify snapshots after first operation
+        assertThat(paimonIcebergMetadata1.snapshots()).hasSize(1);
+        assertThat(paimonIcebergMetadata1.currentSnapshotId()).isNotNull();
+        
assertThat(paimonIcebergMetadata1.snapshots().get(0).parentSnapshotId()).isNull();
+
+        // Perform second append operation
+        icebergTable
+                .newFastAppend()
+                .appendFile(
+                        DataFiles.builder(PartitionSpec.unpartitioned())
+                                .withPath("/path/to/data-b.parquet")
+                                .withFileSizeInBytes(200)
+                                .withRecordCount(20)
+                                .build())
+                .commit();
+
+        // Read metadata after second operation
+        IcebergMetadata paimonIcebergMetadata2 = 
readIcebergMetadata("operations_table");
+
+        // Verify snapshots after second operation
+        assertThat(paimonIcebergMetadata2.snapshots()).hasSize(2);
+        assertThat(paimonIcebergMetadata2.currentSnapshotId())
+                
.isEqualTo(paimonIcebergMetadata2.snapshots().get(1).snapshotId());
+        
assertThat(paimonIcebergMetadata2.snapshots().get(1).parentSnapshotId())
+                
.isEqualTo(paimonIcebergMetadata2.snapshots().get(0).snapshotId());
+
+        // Verify snapshot sequence numbers
+        
assertThat(paimonIcebergMetadata2.snapshots().get(0).sequenceNumber()).isEqualTo(1L);
+        
assertThat(paimonIcebergMetadata2.snapshots().get(1).sequenceNumber()).isEqualTo(2L);
+    }
+
+    @Test
+    @DisplayName("Test reading metadata with Iceberg table properties")
+    void testReadMetadataWithIcebergTableProperties() throws Exception {
+        // Create Iceberg table with custom properties
+        TableIdentifier identifier = TableIdentifier.of("testdb", 
"properties_table");
+        Schema schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.required(2, "name", 
Types.StringType.get()));
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("write.format.default", "parquet");
+        properties.put("write.parquet.compression-codec", "snappy");
+        properties.put("write.target-file-size-bytes", "134217728");
+
+        Table icebergTable =
+                restCatalog.buildTable(identifier, 
schema).withProperties(properties).create();
+
+        // Read metadata using Paimon's IcebergMetadata
+        IcebergMetadata paimonIcebergMetadata = 
readIcebergMetadata(icebergTable);
+
+        // Verify properties
+        assertThat(paimonIcebergMetadata.properties()).isNotEmpty();
+        
assertThat(paimonIcebergMetadata.properties().get("write.format.default"))
+                .isEqualTo("parquet");
+        
assertThat(paimonIcebergMetadata.properties().get("write.parquet.compression-codec"))
+                .isEqualTo("snappy");
+        
assertThat(paimonIcebergMetadata.properties().get("write.target-file-size-bytes"))
+                .isEqualTo("134217728");
+    }
+
+    @Test
+    @DisplayName("Test reading metadata with complex Iceberg schema")
+    void testReadMetadataWithComplexIcebergSchema() throws Exception {
+        // Create Iceberg table with complex schema
+        TableIdentifier identifier = TableIdentifier.of("testdb", 
"complex_schema_table");
+        Schema schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.required(2, "name", 
Types.StringType.get()),
+                        Types.NestedField.optional(
+                                3,
+                                "address",
+                                Types.StructType.of(
+                                        Types.NestedField.required(
+                                                4, "street", 
Types.StringType.get()),
+                                        Types.NestedField.required(
+                                                5, "city", 
Types.StringType.get()),
+                                        Types.NestedField.optional(
+                                                6, "zipcode", 
Types.StringType.get()))),
+                        Types.NestedField.optional(
+                                7, "tags", Types.ListType.ofOptional(8, 
Types.StringType.get())),
+                        Types.NestedField.optional(
+                                9,
+                                "metadata",
+                                Types.MapType.ofOptional(
+                                        10, 11, Types.StringType.get(), 
Types.StringType.get())));
+
+        Table icebergTable = restCatalog.buildTable(identifier, 
schema).create();
+
+        // Read metadata using Paimon's IcebergMetadata
+        IcebergMetadata paimonIcebergMetadata = 
readIcebergMetadata(icebergTable);
+
+        // Verify schema
+        assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+        IcebergSchema paimonIcebergSchema = 
paimonIcebergMetadata.schemas().get(0);
+        assertThat(paimonIcebergSchema.fields()).hasSize(5);
+
+        // Verify basic fields
+        assertThat(paimonIcebergSchema.fields().get(0).name()).isEqualTo("id");
+        
assertThat(paimonIcebergSchema.fields().get(0).type()).isEqualTo("int");
+        
assertThat(paimonIcebergSchema.fields().get(1).name()).isEqualTo("name");
+        
assertThat(paimonIcebergSchema.fields().get(1).type()).isEqualTo("string");
+
+        // Verify complex fields exist
+        
assertThat(paimonIcebergSchema.fields().get(2).name()).isEqualTo("address");
+        
assertThat(paimonIcebergSchema.fields().get(3).name()).isEqualTo("tags");
+        
assertThat(paimonIcebergSchema.fields().get(4).name()).isEqualTo("metadata");
+    }
+
+    @Test
+    @DisplayName("Test reading metadata with Iceberg table evolution")
+    void testReadMetadataWithIcebergTableEvolution() throws Exception {
+        // Create initial Iceberg table
+        Table icebergTable = createBasicIcebergTable("evolution_table");
+
+        // Read initial metadata
+        IcebergMetadata initialMetadata = 
readIcebergMetadata("evolution_table");
+        assertThat(initialMetadata.schemas()).hasSize(1);
+        assertThat(initialMetadata.schemas().get(0).fields()).hasSize(3);
+        assertThat(initialMetadata.currentSchemaId()).isEqualTo(0);
+
+        // Evolve schema by adding a new column
+        icebergTable.updateSchema().addColumn("email", 
Types.StringType.get()).commit();
+
+        // Read metadata after schema evolution
+        IcebergMetadata evolvedMetadata = 
readIcebergMetadata("evolution_table");
+
+        // Verify schema evolution
+        assertThat(evolvedMetadata.schemas()).hasSize(2); // Should have 2 
schemas now
+        assertThat(evolvedMetadata.currentSchemaId())
+                .isEqualTo(1); // Current schema should be the new one
+
+        // Verify the current schema has the new field
+        IcebergSchema currentSchema =
+                evolvedMetadata.schemas().stream()
+                        .filter(schema -> schema.schemaId() == 
evolvedMetadata.currentSchemaId())
+                        .findFirst()
+                        .orElseThrow();
+        assertThat(currentSchema.fields()).hasSize(4);
+        assertThat(currentSchema.fields().get(3).name()).isEqualTo("email");
+        assertThat(currentSchema.fields().get(3).type()).isEqualTo("string");
+    }
+
+    @Test
+    @DisplayName("Test reading metadata with Iceberg table partitioning 
evolution")
+    void testReadMetadataWithIcebergPartitioningEvolution() throws Exception {
+        // Create initial unpartitioned Iceberg table
+        Table icebergTable = 
createBasicIcebergTable("partition_evolution_table");
+
+        // Read initial metadata
+        IcebergMetadata initialMetadata = 
readIcebergMetadata("partition_evolution_table");
+        assertThat(initialMetadata.partitionSpecs()).hasSize(1);
+        
assertThat(initialMetadata.partitionSpecs().get(0).fields()).isEmpty(); // 
Unpartitioned
+
+        // Evolve partitioning by adding a partition
+        icebergTable.updateSpec().addField("name").commit();
+
+        // Read metadata after partitioning evolution
+        IcebergMetadata evolvedMetadata = 
readIcebergMetadata("partition_evolution_table");
+
+        // Verify partitioning evolution
+        assertThat(evolvedMetadata.partitionSpecs())
+                .hasSize(2); // Should have 2 partition specs now
+        assertThat(evolvedMetadata.defaultSpecId())
+                .isEqualTo(1); // Default spec should be the new one
+
+        // Verify the current partition spec has the new field
+        IcebergPartitionSpec currentSpec =
+                evolvedMetadata.partitionSpecs().stream()
+                        .filter(spec -> spec.specId() == 
evolvedMetadata.defaultSpecId())
+                        .findFirst()
+                        .orElseThrow();
+        assertThat(currentSpec.fields()).hasSize(1);
+        assertThat(currentSpec.fields().get(0).name()).isEqualTo("name");
+        
assertThat(currentSpec.fields().get(0).transform()).isEqualTo("identity");
+    }
+
+    @Test
+    @DisplayName("Test FORMAT_VERSION_V3 table")
+    void testFormatVersionV3Table() throws Exception {
+        // Create a v3 format version Iceberg table
+        Table icebergTable = createIcebergTableV3("v3_snapshot_table");
+        TableMetadata base = ((HasTableOperations) 
icebergTable).operations().current();
+        ((HasTableOperations) icebergTable)
+                .operations()
+                .commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+        // Read metadata using Paimon's IcebergMetadata
+        IcebergMetadata paimonIcebergMetadata = 
readIcebergMetadata(icebergTable);
+
+        // Verify basic properties
+        assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(3);
+        assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+        assertThat(paimonIcebergMetadata.location()).isNotNull();
+        assertThat(paimonIcebergMetadata.currentSchemaId()).isEqualTo(0);
+        assertThat(paimonIcebergMetadata.defaultSpecId()).isEqualTo(0);
+        assertThat(paimonIcebergMetadata.defaultSortOrderId()).isEqualTo(0);
+
+        // Verify schema
+        assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+        IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+        assertThat(schema.fields()).hasSize(3);
+
+        // Verify field details
+        assertThat(schema.fields().get(0).name()).isEqualTo("id");
+        assertThat(schema.fields().get(0).type()).isEqualTo("int");
+        assertThat(schema.fields().get(0).required()).isTrue();
+
+        assertThat(schema.fields().get(1).name()).isEqualTo("name");
+        assertThat(schema.fields().get(1).type()).isEqualTo("string");
+        assertThat(schema.fields().get(1).required()).isTrue();
+
+        assertThat(schema.fields().get(2).name()).isEqualTo("age");
+        assertThat(schema.fields().get(2).type()).isEqualTo("int");
+        assertThat(schema.fields().get(2).required()).isFalse();
+
+        // Verify partition specs (should be unpartitioned)
+        assertThat(paimonIcebergMetadata.partitionSpecs()).hasSize(1);
+        
assertThat(paimonIcebergMetadata.partitionSpecs().get(0).fields()).isEmpty();
+
+        // Perform first append operation
+        icebergTable
+                .newFastAppend()
+                .appendFile(
+                        DataFiles.builder(PartitionSpec.unpartitioned())
+                                .withPath("/path/to/data-v3.parquet")
+                                .withFileSizeInBytes(100)
+                                .withRecordCount(10)
+                                .build())
+                .commit();
+
+        // Read metadata after first operation
+        paimonIcebergMetadata = readIcebergMetadata("v3_snapshot_table");
+
+        // Verify snapshots after first operation
+        assertThat(paimonIcebergMetadata.snapshots()).hasSize(1);
+        assertThat(paimonIcebergMetadata.currentSnapshotId()).isNotNull();
+
+        IcebergSnapshot snapshot = paimonIcebergMetadata.snapshots().get(0);
+        assertThat(snapshot.parentSnapshotId()).isNull();
+
+        assertThat(snapshot.firstRowId()).isEqualTo(0L);
+        assertThat(snapshot.addedRows()).isEqualTo(10L);
+
+        // Verify other snapshot properties
+        assertThat(snapshot.snapshotId()).isNotNull();
+        assertThat(snapshot.sequenceNumber()).isEqualTo(1L);
+        assertThat(snapshot.timestampMs()).isGreaterThan(0);
+        assertThat(snapshot.schemaId()).isEqualTo(0);
+
+        // Perform second append operation
+        icebergTable
+                .newFastAppend()
+                .appendFile(
+                        DataFiles.builder(PartitionSpec.unpartitioned())
+                                .withPath("/path/to/data-v3-2.parquet")
+                                .withFileSizeInBytes(200)
+                                .withRecordCount(20)
+                                .build())
+                .commit();
+
+        // Read metadata after second operation
+        IcebergMetadata paimonIcebergMetadata2 = 
readIcebergMetadata("v3_snapshot_table");
+
+        // Verify second snapshot
+        assertThat(paimonIcebergMetadata2.snapshots()).hasSize(2);
+        assertThat(paimonIcebergMetadata2.currentSnapshotId())
+                
.isEqualTo(paimonIcebergMetadata2.snapshots().get(1).snapshotId());
+        
assertThat(paimonIcebergMetadata2.snapshots().get(1).parentSnapshotId())
+                
.isEqualTo(paimonIcebergMetadata2.snapshots().get(0).snapshotId());
+
+        // Verify snapshot sequence numbers
+        
assertThat(paimonIcebergMetadata2.snapshots().get(0).sequenceNumber()).isEqualTo(1L);
+        
assertThat(paimonIcebergMetadata2.snapshots().get(1).sequenceNumber()).isEqualTo(2L);
+    }
+
+    /** Helper method to create a basic Iceberg table with simple schema. */
+    private Table createBasicIcebergTable(String tableName) {
+        TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+        Schema schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.required(2, "name", 
Types.StringType.get()),
+                        Types.NestedField.optional(3, "age", 
Types.IntegerType.get()));
+        return restCatalog.buildTable(identifier, schema).create();
+    }
+
+    /** Helper method to create an Iceberg table with partitioning. */
+    private Table createPartitionedIcebergTable(String tableName) {
+        TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+        Schema schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.required(2, "name", 
Types.StringType.get()),
+                        Types.NestedField.required(3, "department", 
Types.StringType.get()),
+                        Types.NestedField.required(4, "salary", 
Types.DoubleType.get()));
+        PartitionSpec partitionSpec =
+                
PartitionSpec.builderFor(schema).identity("department").build();
+        return restCatalog.buildTable(identifier, 
schema).withPartitionSpec(partitionSpec).create();
+    }
+
+    /** Helper method to create an Iceberg table with sort order. */
+    private Table createSortedIcebergTable(String tableName) {
+        TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+        Schema schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.required(2, "name", 
Types.StringType.get()),
+                        Types.NestedField.required(3, "score", 
Types.DoubleType.get()));
+        SortOrder sortOrder = 
SortOrder.builderFor(schema).asc("score").desc("id").build();
+        return restCatalog.buildTable(identifier, 
schema).withSortOrder(sortOrder).create();
+    }
+
+    /** Helper method to create an Iceberg table with FORMAT_VERSION_V3. */
+    private Table createIcebergTableV3(String tableName) {
+        TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+        Schema schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.required(2, "name", 
Types.StringType.get()),
+                        Types.NestedField.optional(3, "age", 
Types.IntegerType.get()));
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("write.format.default", "parquet");
+        properties.put("write.parquet.compression-codec", "snappy");
+        properties.put("format-version", "3");
+
+        return restCatalog.buildTable(identifier, 
schema).withProperties(properties).create();
+    }
+
+    /** Helper method to read Iceberg metadata using Paimon's IcebergMetadata. 
*/
+    private IcebergMetadata readIcebergMetadata(String tableName) throws 
Exception {
+        TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+        Table icebergTable = restCatalog.loadTable(identifier);
+        return readIcebergMetadata(icebergTable);
+    }
+
+    private IcebergMetadata readIcebergMetadata(Table icebergTable) throws 
Exception {
+        String metaFileLocation = TableUtil.metadataFileLocation(icebergTable);
+        Path metaFilePath = new Path(metaFileLocation);
+        Options options = new Options();
+        FileIO fileIO = FileIO.get(metaFilePath, 
CatalogContext.create(options));
+        return IcebergMetadata.fromPath(fileIO, metaFilePath);
+    }
+}
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
index 6eebe9f1e6..e52ac6e06b 100644
--- 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaChange;
@@ -608,6 +609,70 @@ public class IcebergRestMetadataCommitterTest {
         commit.close();
     }
 
+    @Test
+    public void testParentSnapshotIdTracking() throws Exception {
+        // create and write with paimon client
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        randomFormat(),
+                        Collections.emptyMap());
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        // First commit - should have null parent snapshot ID
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        FileIO fileIO = table.fileIO();
+        IcebergMetadata metadata1 =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(catalogTableMetadataPath(table), 
"v1.metadata.json"));
+        assertThat(metadata1.snapshots()).hasSize(1);
+        assertThat(metadata1.snapshots().get(0).parentSnapshotId()).isNull();
+        assertThat(metadata1.snapshots().get(0).snapshotId()).isEqualTo(1);
+
+        // Second commit - should have parent snapshot ID pointing to first 
snapshot
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(3, 30));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        IcebergMetadata metadata2 =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(catalogTableMetadataPath(table), 
"v2.metadata.json"));
+        assertThat(metadata2.snapshots()).hasSize(2);
+        // The last snapshot should have parent pointing to the previous 
snapshot
+        IcebergSnapshot lastSnapshot = 
metadata2.snapshots().get(metadata2.snapshots().size() - 1);
+        assertThat(lastSnapshot.parentSnapshotId()).isEqualTo(1L);
+        assertThat(lastSnapshot.snapshotId()).isEqualTo(2);
+
+        // Third commit - should have parent snapshot ID pointing to second 
snapshot
+        write.write(GenericRow.of(2, 21));
+        write.write(GenericRow.of(4, 40));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        IcebergMetadata metadata3 =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(catalogTableMetadataPath(table), 
"v3.metadata.json"));
+        assertThat(metadata3.snapshots()).hasSize(3);
+        // The last snapshot should have parent pointing to the previous 
snapshot
+        IcebergSnapshot lastSnapshot3 = 
metadata3.snapshots().get(metadata3.snapshots().size() - 1);
+        assertThat(lastSnapshot3.parentSnapshotId()).isEqualTo(2L);
+        assertThat(lastSnapshot3.snapshotId()).isEqualTo(3);
+
+        write.close();
+        commit.close();
+    }
+
     private static class TestRecord {
         private final BinaryRow partition;
         private final GenericRow record;


Reply via email to