This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 63ecc7c387 Spark: fix NPE thrown for MAP/LIST columns on DELETE,
UPDATE, and MERGE operations (#15726)
63ecc7c387 is described below
commit 63ecc7c3871ec1a1846c2d335c2055ea1865a81f
Author: antonlin1 <[email protected]>
AuthorDate: Wed Mar 25 07:06:56 2026 +0100
Spark: fix NPE thrown for MAP/LIST columns on DELETE, UPDATE, and MERGE
operations (#15726)
* Spark: fix _partition child ID collision with MAP/LIST columns in
allUsedFieldIds
BaseSparkScanBuilder.allUsedFieldIds() used TypeUtil.getProjectedIds() which
omits MAP and LIST field IDs (it is designed for column projection, not
collision
avoidance). This caused _partition struct child IDs to be reassigned to the
same
IDs as MAP/LIST columns, triggering a NPE in PruneColumns.isStruct() during
merge-on-read scans when the _partition metadata column is included in the
projection.
Fix: use TypeUtil.indexById() which indexes ALL field IDs recursively,
matching
the behavior of the pre-1.11 Spark 3.5 code that this replaced.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]>
* rename test + add list column test
* add `_partition` to final assert select query in regression tests
---------
Co-authored-by: Claude Sonnet 4.6 (1M context) <[email protected]>
---
.../iceberg/spark/source/BaseSparkScanBuilder.java | 2 +-
.../spark/source/TestSparkMetadataColumns.java | 82 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
index ec72a1505a..be34eaeed5 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
@@ -230,7 +230,7 @@ abstract class BaseSparkScanBuilder implements ScanBuilder {
// collects used data field IDs across all known table schemas
private Set<Integer> allUsedFieldIds() {
return table.schemas().values().stream()
- .flatMap(tableSchema ->
TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
+ .flatMap(tableSchema ->
TypeUtil.indexById(tableSchema.asStruct()).keySet().stream())
.collect(Collectors.toSet());
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index 68f6f17aae..7498f129b2 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -343,6 +343,88 @@ public class TestSparkMetadataColumns extends TestBase {
}
}
+ @TestTemplate
+ public void testPartitionMetadataColumnWithMapColumn() throws IOException {
+ assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+
+ Schema mapSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "ts", Types.LongType.get()),
+ Types.NestedField.optional(
+ 3,
+ "tags",
+ Types.MapType.ofOptional(4, 5, Types.StringType.get(),
Types.StringType.get())));
+ PartitionSpec bucketSpec =
PartitionSpec.builderFor(mapSchema).bucket("id", 1).build();
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(FORMAT_VERSION, String.valueOf(formatVersion));
+ properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
+ properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+ // merge-on-read: DELETE writes position delete files instead of rewriting
data files.
+ // This routes through SupportsDelta which adds _partition to the scan
projection.
+ properties.put("write.delete.mode", "merge-on-read");
+
+ String mapTableName = "test_map_partition_collision";
+ TestTables.create(
+ Files.createTempDirectory(temp, "junit").toFile(),
+ mapTableName,
+ mapSchema,
+ bucketSpec,
+ properties);
+
+ // Both rows in a single INSERT so they land in the same Parquet file.
+ // With both rows sharing a file, Spark uses merge-on-read which adds
_partition to the scan
+ // projection.
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, 1000, map('env', 'prod')), (2,
9999999999999999, map('env', 'dev'))",
+ mapTableName);
+
+ sql("DELETE FROM %s WHERE ts < 9999999999999999", mapTableName);
+ assertThat(sql("SELECT id, _partition FROM %s", mapTableName)).hasSize(1);
+ }
+
+ @TestTemplate
+ public void testPartitionMetadataColumnWithListColumn() throws IOException {
+ assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+
+ Schema listSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "ts", Types.LongType.get()),
+ Types.NestedField.optional(
+ 3, "tags", Types.ListType.ofOptional(4,
Types.StringType.get())));
+ PartitionSpec bucketSpec =
PartitionSpec.builderFor(listSchema).bucket("id", 1).build();
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(FORMAT_VERSION, String.valueOf(formatVersion));
+ properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
+ properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+ // merge-on-read: DELETE writes position delete files instead of rewriting
data files.
+ // This routes through SupportsDelta which adds _partition to the scan
projection.
+ properties.put("write.delete.mode", "merge-on-read");
+
+ String listTableName = "test_list_partition_collision";
+ TestTables.create(
+ Files.createTempDirectory(temp, "junit").toFile(),
+ listTableName,
+ listSchema,
+ bucketSpec,
+ properties);
+
+ // Both rows in a single INSERT so they land in the same Parquet file.
+ // With both rows sharing a file, Spark uses merge-on-read which adds
_partition to the scan
+ // projection.
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, 1000, array('prod')), (2,
9999999999999999, array('dev'))",
+ listTableName);
+
+ sql("DELETE FROM %s WHERE ts < 9999999999999999", listTableName);
+ assertThat(sql("SELECT id, _partition FROM %s", listTableName)).hasSize(1);
+ }
+
private void createAndInitTable() throws IOException {
Map<String, String> properties = Maps.newHashMap();
properties.put(FORMAT_VERSION, String.valueOf(formatVersion));