This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fce069f170 Spark 3.5: Fix row lineage inheritance for distributed
planning (#13061)
fce069f170 is described below
commit fce069f1704fe5d1840b50014e8ed966377ee0b7
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri Jun 27 11:09:53 2025 -0400
Spark 3.5: Fix row lineage inheritance for distributed planning (#13061)
---
.../TestRowLevelOperationsWithLineage.java | 41 ++++++++++++++++++++++
.../iceberg/spark/actions/ManifestFileBean.java | 11 ++++++
2 files changed, 52 insertions(+)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index 34f1f5d341..b485b79441 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -19,6 +19,10 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.MetadataColumns.schemaWithRowLineage;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
import static org.apache.iceberg.spark.Spark3Util.loadIcebergTable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
@@ -33,6 +37,7 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -47,8 +52,10 @@ import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.functions.BucketFunction;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
@@ -81,6 +88,40 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
createRecord(SCHEMA, 103, "d", 3L, 1L),
createRecord(SCHEMA, 104, "e", 4L, 1L));
+ @Parameters(
+ name =
+ "catalogName = {0}, implementation = {1}, config = {2},"
+ + " format = {3}, vectorized = {4}, distributionMode = {5},"
+ + " fanout = {6}, branch = {7}, planningMode = {8},
formatVersion = {9}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ "testhadoop",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of("type", "hadoop"),
+ FileFormat.PARQUET,
+ true,
+ WRITE_DISTRIBUTION_MODE_HASH,
+ true,
+ null,
+ LOCAL,
+ 3
+ },
+ {
+ "testhadoop",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of("type", "hadoop"),
+ FileFormat.PARQUET,
+ false,
+ WRITE_DISTRIBUTION_MODE_RANGE,
+ true,
+ null,
+ DISTRIBUTED,
+ 3
+ },
+ };
+ }
+
@BeforeAll
public static void setupSparkConf() {
spark.conf().set("spark.sql.shuffle.partitions", "4");
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
index 11ad834244..fd46398977 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
@@ -36,6 +36,7 @@ public class ManifestFileBean implements ManifestFile,
Serializable {
private Long addedSnapshotId = null;
private Integer content = null;
private Long sequenceNumber = null;
+ private Long firstRowId = null;
public static ManifestFileBean fromManifest(ManifestFile manifest) {
ManifestFileBean bean = new ManifestFileBean();
@@ -46,6 +47,7 @@ public class ManifestFileBean implements ManifestFile,
Serializable {
bean.setAddedSnapshotId(manifest.snapshotId());
bean.setContent(manifest.content().id());
bean.setSequenceNumber(manifest.sequenceNumber());
+ bean.setFirstRowId(manifest.firstRowId());
return bean;
}
@@ -98,6 +100,10 @@ public class ManifestFileBean implements ManifestFile,
Serializable {
this.sequenceNumber = sequenceNumber;
}
+ public void setFirstRowId(Long firstRowId) {
+ this.firstRowId = firstRowId;
+ }
+
@Override
public String path() {
return path;
@@ -173,6 +179,11 @@ public class ManifestFileBean implements ManifestFile,
Serializable {
return null;
}
+ @Override
+ public Long firstRowId() {
+ return firstRowId;
+ }
+
@Override
public ManifestFile copy() {
throw new UnsupportedOperationException("Cannot copy");