This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 63ecc7c387 Spark: fix NPE thrown for MAP/LIST columns on DELETE, 
UPDATE, and MERGE operations (#15726)
63ecc7c387 is described below

commit 63ecc7c3871ec1a1846c2d335c2055ea1865a81f
Author: antonlin1 <[email protected]>
AuthorDate: Wed Mar 25 07:06:56 2026 +0100

    Spark: fix NPE thrown for MAP/LIST columns on DELETE, UPDATE, and MERGE 
operations (#15726)
    
    * Spark: fix _partition child ID collision with MAP/LIST columns in 
allUsedFieldIds
    
    BaseSparkScanBuilder.allUsedFieldIds() used TypeUtil.getProjectedIds() which
    omits MAP and LIST field IDs (it is designed for column projection, not 
collision
    avoidance). This caused _partition struct child IDs to be reassigned to the 
same
    IDs as MAP/LIST columns, triggering a NPE in PruneColumns.isStruct() during
    merge-on-read scans when the _partition metadata column is included in the
    projection.
    
    Fix: use TypeUtil.indexById() which indexes ALL field IDs recursively, 
matching
    the behavior of the pre-1.11 Spark 3.5 code that this replaced.
    
    Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]>
    
    * rename test + add list column test
    
    * add `_partition` to final assert select query in regression tests
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.6 (1M context) <[email protected]>
---
 .../iceberg/spark/source/BaseSparkScanBuilder.java |  2 +-
 .../spark/source/TestSparkMetadataColumns.java     | 82 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
index ec72a1505a..be34eaeed5 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
@@ -230,7 +230,7 @@ abstract class BaseSparkScanBuilder implements ScanBuilder {
   // collects used data field IDs across all known table schemas
   private Set<Integer> allUsedFieldIds() {
     return table.schemas().values().stream()
-        .flatMap(tableSchema -> 
TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
+        .flatMap(tableSchema -> 
TypeUtil.indexById(tableSchema.asStruct()).keySet().stream())
         .collect(Collectors.toSet());
   }
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index 68f6f17aae..7498f129b2 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -343,6 +343,88 @@ public class TestSparkMetadataColumns extends TestBase {
     }
   }
 
+  @TestTemplate
+  public void testPartitionMetadataColumnWithMapColumn() throws IOException {
+    assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+
+    Schema mapSchema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.LongType.get()),
+            Types.NestedField.required(2, "ts", Types.LongType.get()),
+            Types.NestedField.optional(
+                3,
+                "tags",
+                Types.MapType.ofOptional(4, 5, Types.StringType.get(), 
Types.StringType.get())));
+    PartitionSpec bucketSpec = 
PartitionSpec.builderFor(mapSchema).bucket("id", 1).build();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(FORMAT_VERSION, String.valueOf(formatVersion));
+    properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
+    properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+    // merge-on-read: DELETE writes position delete files instead of rewriting 
data files.
+    // This routes through SupportsDelta which adds _partition to the scan 
projection.
+    properties.put("write.delete.mode", "merge-on-read");
+
+    String mapTableName = "test_map_partition_collision";
+    TestTables.create(
+        Files.createTempDirectory(temp, "junit").toFile(),
+        mapTableName,
+        mapSchema,
+        bucketSpec,
+        properties);
+
+    // Both rows in a single INSERT so they land in the same Parquet file.
+    // With both rows sharing a file, Spark uses merge-on-read which adds 
_partition to the scan
+    // projection.
+    sql(
+        "INSERT INTO TABLE %s VALUES (1, 1000, map('env', 'prod')), (2, 
9999999999999999, map('env', 'dev'))",
+        mapTableName);
+
+    sql("DELETE FROM %s WHERE ts < 9999999999999999", mapTableName);
+    assertThat(sql("SELECT id, _partition FROM %s", mapTableName)).hasSize(1);
+  }
+
+  @TestTemplate
+  public void testPartitionMetadataColumnWithListColumn() throws IOException {
+    assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+
+    Schema listSchema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.LongType.get()),
+            Types.NestedField.required(2, "ts", Types.LongType.get()),
+            Types.NestedField.optional(
+                3, "tags", Types.ListType.ofOptional(4, 
Types.StringType.get())));
+    PartitionSpec bucketSpec = 
PartitionSpec.builderFor(listSchema).bucket("id", 1).build();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(FORMAT_VERSION, String.valueOf(formatVersion));
+    properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
+    properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+    // merge-on-read: DELETE writes position delete files instead of rewriting 
data files.
+    // This routes through SupportsDelta which adds _partition to the scan 
projection.
+    properties.put("write.delete.mode", "merge-on-read");
+
+    String listTableName = "test_list_partition_collision";
+    TestTables.create(
+        Files.createTempDirectory(temp, "junit").toFile(),
+        listTableName,
+        listSchema,
+        bucketSpec,
+        properties);
+
+    // Both rows in a single INSERT so they land in the same Parquet file.
+    // With both rows sharing a file, Spark uses merge-on-read which adds 
_partition to the scan
+    // projection.
+    sql(
+        "INSERT INTO TABLE %s VALUES (1, 1000, array('prod')), (2, 
9999999999999999, array('dev'))",
+        listTableName);
+
+    sql("DELETE FROM %s WHERE ts < 9999999999999999", listTableName);
+    assertThat(sql("SELECT id, _partition FROM %s", listTableName)).hasSize(1);
+  }
+
   private void createAndInitTable() throws IOException {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(FORMAT_VERSION, String.valueOf(formatVersion));

Reply via email to