This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new fce069f170 Spark 3.5: Fix row lineage inheritance for distributed 
planning (#13061)
fce069f170 is described below

commit fce069f1704fe5d1840b50014e8ed966377ee0b7
Author: Amogh Jahagirdar <amo...@apache.org>
AuthorDate: Fri Jun 27 11:09:53 2025 -0400

    Spark 3.5: Fix row lineage inheritance for distributed planning (#13061)
---
 .../TestRowLevelOperationsWithLineage.java         | 41 ++++++++++++++++++++++
 .../iceberg/spark/actions/ManifestFileBean.java    | 11 ++++++
 2 files changed, 52 insertions(+)

diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index 34f1f5d341..b485b79441 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -19,6 +19,10 @@
 package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.MetadataColumns.schemaWithRowLineage;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
 import static org.apache.iceberg.spark.Spark3Util.loadIcebergTable;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
@@ -33,6 +37,7 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -47,8 +52,10 @@ import org.apache.iceberg.encryption.EncryptionUtil;
 import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.functions.BucketFunction;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
@@ -81,6 +88,40 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
           createRecord(SCHEMA, 103, "d", 3L, 1L),
           createRecord(SCHEMA, 104, "e", 4L, 1L));
 
+  @Parameters(
+      name =
+          "catalogName = {0}, implementation = {1}, config = {2},"
+              + " format = {3}, vectorized = {4}, distributionMode = {5},"
+              + " fanout = {6}, branch = {7}, planningMode = {8}, 
formatVersion = {9}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        "testhadoop",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of("type", "hadoop"),
+        FileFormat.PARQUET,
+        true,
+        WRITE_DISTRIBUTION_MODE_HASH,
+        true,
+        null,
+        LOCAL,
+        3
+      },
+      {
+        "testhadoop",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of("type", "hadoop"),
+        FileFormat.PARQUET,
+        false,
+        WRITE_DISTRIBUTION_MODE_RANGE,
+        true,
+        null,
+        DISTRIBUTED,
+        3
+      },
+    };
+  }
+
   @BeforeAll
   public static void setupSparkConf() {
     spark.conf().set("spark.sql.shuffle.partitions", "4");
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
index 11ad834244..fd46398977 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
@@ -36,6 +36,7 @@ public class ManifestFileBean implements ManifestFile, 
Serializable {
   private Long addedSnapshotId = null;
   private Integer content = null;
   private Long sequenceNumber = null;
+  private Long firstRowId = null;
 
   public static ManifestFileBean fromManifest(ManifestFile manifest) {
     ManifestFileBean bean = new ManifestFileBean();
@@ -46,6 +47,7 @@ public class ManifestFileBean implements ManifestFile, 
Serializable {
     bean.setAddedSnapshotId(manifest.snapshotId());
     bean.setContent(manifest.content().id());
     bean.setSequenceNumber(manifest.sequenceNumber());
+    bean.setFirstRowId(manifest.firstRowId());
 
     return bean;
   }
@@ -98,6 +100,10 @@ public class ManifestFileBean implements ManifestFile, 
Serializable {
     this.sequenceNumber = sequenceNumber;
   }
 
+  public void setFirstRowId(Long firstRowId) {
+    this.firstRowId = firstRowId;
+  }
+
   @Override
   public String path() {
     return path;
@@ -173,6 +179,11 @@ public class ManifestFileBean implements ManifestFile, 
Serializable {
     return null;
   }
 
+  @Override
+  public Long firstRowId() {
+    return firstRowId;
+  }
+
   @Override
   public ManifestFile copy() {
     throw new UnsupportedOperationException("Cannot copy");

Reply via email to