This is an automated email from the ASF dual-hosted git repository. amoghj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new fce069f170 Spark 3.5: Fix row lineage inheritance for distributed planning (#13061) fce069f170 is described below commit fce069f1704fe5d1840b50014e8ed966377ee0b7 Author: Amogh Jahagirdar <amo...@apache.org> AuthorDate: Fri Jun 27 11:09:53 2025 -0400 Spark 3.5: Fix row lineage inheritance for distributed planning (#13061) --- .../TestRowLevelOperationsWithLineage.java | 41 ++++++++++++++++++++++ .../iceberg/spark/actions/ManifestFileBean.java | 11 ++++++ 2 files changed, 52 insertions(+) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java index 34f1f5d341..b485b79441 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java @@ -19,6 +19,10 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.MetadataColumns.schemaWithRowLineage; +import static org.apache.iceberg.PlanningMode.DISTRIBUTED; +import static org.apache.iceberg.PlanningMode.LOCAL; +import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH; +import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE; import static org.apache.iceberg.spark.Spark3Util.loadIcebergTable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; @@ -33,6 +37,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -47,8 +52,10 @@ import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.functions.BucketFunction; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; @@ -81,6 +88,40 @@ public abstract class TestRowLevelOperationsWithLineage extends SparkRowLevelOpe createRecord(SCHEMA, 103, "d", 3L, 1L), createRecord(SCHEMA, 104, "e", 4L, 1L)); + @Parameters( + name = + "catalogName = {0}, implementation = {1}, config = {2}," + + " format = {3}, vectorized = {4}, distributionMode = {5}," + + " fanout = {6}, branch = {7}, planningMode = {8}, formatVersion = {9}") + public static Object[][] parameters() { + return new Object[][] { + { + "testhadoop", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hadoop"), + FileFormat.PARQUET, + true, + WRITE_DISTRIBUTION_MODE_HASH, + true, + null, + LOCAL, + 3 + }, + { + "testhadoop", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hadoop"), + FileFormat.PARQUET, + false, + WRITE_DISTRIBUTION_MODE_RANGE, + true, + null, + DISTRIBUTED, + 3 + }, + }; + } + @BeforeAll public static void setupSparkConf() { spark.conf().set("spark.sql.shuffle.partitions", "4"); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java index 11ad834244..fd46398977 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java @@ -36,6 +36,7 @@ public class ManifestFileBean implements ManifestFile, Serializable { private Long addedSnapshotId = null; private Integer content = null; private Long sequenceNumber = null; + private Long firstRowId = null; public static ManifestFileBean fromManifest(ManifestFile manifest) { ManifestFileBean bean = new ManifestFileBean(); @@ -46,6 +47,7 @@ public class ManifestFileBean implements ManifestFile, Serializable { bean.setAddedSnapshotId(manifest.snapshotId()); bean.setContent(manifest.content().id()); bean.setSequenceNumber(manifest.sequenceNumber()); + bean.setFirstRowId(manifest.firstRowId()); return bean; } @@ -98,6 +100,10 @@ public class ManifestFileBean implements ManifestFile, Serializable { this.sequenceNumber = sequenceNumber; } + public void setFirstRowId(Long firstRowId) { + this.firstRowId = firstRowId; + } + @Override public String path() { return path; @@ -173,6 +179,11 @@ public class ManifestFileBean implements ManifestFile, Serializable { return null; } + @Override + public Long firstRowId() { + return firstRowId; + } + @Override public ManifestFile copy() { throw new UnsupportedOperationException("Cannot copy");