This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new 93cb59ace3 Spark 3.4: Pass format-version when creating a snapshot 
(#14170) (#14171)
93cb59ace3 is described below

commit 93cb59ace3a299c41104aa5c7d311b8bfbff4e36
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Sep 24 01:42:17 2025 +0200

    Spark 3.4: Pass format-version when creating a snapshot (#14170) (#14171)
---
 .../extensions/TestSnapshotTableProcedure.java     | 94 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 37 ++++++---
 2 files changed, 121 insertions(+), 10 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
index 2ca8de50fa..d5d7dd1993 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
@@ -27,10 +27,18 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.avro.GenericAvroReader;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.AnalysisException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -263,6 +271,14 @@ public class TestSnapshotTableProcedure extends 
ExtensionsTestBase {
         .hasMessage("Parallelism should be larger than 0");
   }
 
+  private static final Schema SNAPSHOT_ID_READ_SCHEMA =
+      new Schema(
+          Types.NestedField.required("snapshot_id")
+              .withId(1)
+              .ofType(Types.LongType.get())
+              .asOptional()
+              .build());
+
   @TestTemplate
   public void testSnapshotPartitionedWithParallelism() throws IOException {
     String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
@@ -282,4 +298,82 @@ public class TestSnapshotTableProcedure extends 
ExtensionsTestBase {
         ImmutableList.of(row("a", 1L), row("b", 2L)),
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
+
+  @TestTemplate
+  public void testSnapshotPartitioned() throws IOException {
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
PARTITIONED BY (id) LOCATION '%s'",
+        SOURCE_NAME, location);
+    sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", 
SOURCE_NAME);
+    assertThat(
+            sql(
+                "CALL %s.system.snapshot(source_table => '%s', table => '%s')",
+                catalogName, SOURCE_NAME, tableName))
+        .containsExactly(row(2L));
+    assertThat(sql("SELECT * FROM %s ORDER BY id", tableName))
+        .containsExactly(row("a", 1L), row("b", 2L));
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row("a", 1L), row("b", 2L)),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    for (ManifestFile manifest : 
createdTable.currentSnapshot().dataManifests(createdTable.io())) {
+      try (AvroIterable<GenericData.Record> reader =
+          Avro.read(org.apache.iceberg.Files.localInput(manifest.path()))
+              .project(SNAPSHOT_ID_READ_SCHEMA)
+              .createResolvingReader(GenericAvroReader::create)
+              .build()) {
+
+        assertThat(reader.getMetadata()).containsEntry("format-version", "2");
+
+        List<GenericData.Record> records = 
Lists.newArrayList(reader.iterator());
+        for (GenericData.Record row : records) {
+          assertThat(row.get(0)).as("Field-ID should be inherited").isNull();
+        }
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testSnapshotPartitionedV1() throws IOException {
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
PARTITIONED BY (id) LOCATION '%s'",
+        SOURCE_NAME, location);
+    sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", 
SOURCE_NAME);
+    assertThat(
+            sql(
+                "CALL %s.system.snapshot(source_table => '%s', table => '%s', 
properties => map('format-version', '1'))",
+                catalogName, SOURCE_NAME, tableName))
+        .containsExactly(row(2L));
+    assertThat(sql("SELECT * FROM %s ORDER BY id", tableName))
+        .containsExactly(row("a", 1L), row("b", 2L));
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row("a", 1L), row("b", 2L)),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    for (ManifestFile manifest : 
createdTable.currentSnapshot().dataManifests(createdTable.io())) {
+      try (AvroIterable<GenericData.Record> reader =
+          Avro.read(org.apache.iceberg.Files.localInput(manifest.path()))
+              .project(SNAPSHOT_ID_READ_SCHEMA)
+              .createResolvingReader(GenericAvroReader::create)
+              .build()) {
+
+        assertThat(reader.getMetadata()).containsEntry("format-version", "1");
+
+        List<GenericData.Record> records = 
Lists.newArrayList(reader.iterator());
+        for (GenericData.Record row : records) {
+          assertThat(row.get(0)).as("Field-ID should not be 
inherited").isNotNull();
+        }
+      }
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index ae819020d8..ade2bd5b5c 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -362,6 +362,8 @@ public class SparkTableUtil {
   }
 
   private static Iterator<ManifestFile> buildManifest(
+      int formatVersion,
+      Long snapshotId,
       SerializableConfiguration conf,
       PartitionSpec spec,
       String basePath,
@@ -379,7 +381,8 @@ public class SparkTableUtil {
       Path location = new Path(basePath, suffix);
       String outputPath = FileFormat.AVRO.addExtension(location.toString());
       OutputFile outputFile = io.newOutputFile(outputPath);
-      ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
+      ManifestWriter<DataFile> writer =
+          ManifestFiles.write(formatVersion, spec, outputFile, snapshotId);
 
       try (ManifestWriter<DataFile> writerRef = writer) {
         fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
@@ -867,6 +870,21 @@ public class SparkTableUtil {
               DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) 
duplicates.take(10))));
     }
 
+    TableOperations ops = ((HasTableOperations) targetTable).operations();
+    int formatVersion = ops.current().formatVersion();
+    boolean snapshotIdInheritanceEnabled =
+        PropertyUtil.propertyAsBoolean(
+            targetTable.properties(),
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+
+    final Long snapshotId;
+    if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
+      snapshotId = -1L;
+    } else {
+      snapshotId = null;
+    }
+
     List<ManifestFile> manifests =
         filesToImport
             .repartition(numShufflePartitions)
@@ -877,19 +895,18 @@ public class SparkTableUtil {
             .orderBy(col("_1"))
             .mapPartitions(
                 (MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
-                    fileTuple -> buildManifest(serializableConf, spec, 
stagingDir, fileTuple),
+                    fileTuple ->
+                        buildManifest(
+                            formatVersion,
+                            snapshotId,
+                            serializableConf,
+                            spec,
+                            stagingDir,
+                            fileTuple),
                 Encoders.javaSerialization(ManifestFile.class))
             .collectAsList();
 
     try {
-      TableOperations ops = ((HasTableOperations) targetTable).operations();
-      int formatVersion = ops.current().formatVersion();
-      boolean snapshotIdInheritanceEnabled =
-          PropertyUtil.propertyAsBoolean(
-              targetTable.properties(),
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
       AppendFiles append = targetTable.newAppend();
       manifests.forEach(append::appendManifest);
       append.commit();

Reply via email to