This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 32b2f00125 Spark: Replicate position delete array/map fix to Spark 
3.4, 3.5, and 4.0 (#15743)
32b2f00125 is described below

commit 32b2f00125261b93c29aca9068d78390135a80f1
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Mar 23 14:32:39 2026 -0700

    Spark: Replicate position delete array/map fix to Spark 3.4, 3.5, and 4.0 
(#15743)
    
    Follow-up to #15632, which updated Spark 4.1 only. Port the same 
PositionDeletesRowReader residual extraction (retain all non-constant field IDs 
for extractByIdInclusive, not only primitives) and the matching 
rewrite_position_delete_files and position_deletes metadata tests to the Spark 
3.4, 3.5, and 4.0 modules.
---
 .../TestRewritePositionDeleteFilesProcedure.java   | 62 ++++++++++++++++++++++
 .../spark/source/PositionDeletesRowReader.java     | 21 +++-----
 .../spark/source/TestPositionDeletesTable.java     | 54 +++++++++++++++++++
 .../TestRewritePositionDeleteFilesProcedure.java   | 62 ++++++++++++++++++++++
 .../spark/source/PositionDeletesRowReader.java     | 21 +++-----
 .../spark/source/TestPositionDeletesTable.java     | 54 +++++++++++++++++++
 .../TestRewritePositionDeleteFilesProcedure.java   | 62 ++++++++++++++++++++++
 .../spark/source/PositionDeletesRowReader.java     | 21 +++-----
 .../spark/source/TestPositionDeletesTable.java     | 54 +++++++++++++++++++
 9 files changed, 369 insertions(+), 42 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0ff3a949ae..e5daaf65d7 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure 
extends ExtensionsTestBase
             EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("3.4"));
   }
 
+  @TestTemplate
+  public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, items 
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+            + "USING iceberg TBLPROPERTIES "
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read', 
'write.update.mode'='merge-on-read')",
+        tableName);
+
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(1, 'a', array(named_struct('value', cast(10 as bigint), 
'count', 1))), "
+            + "(2, 'b', array(named_struct('value', cast(20 as bigint), 
'count', 2))), "
+            + "(3, 'c', array(named_struct('value', cast(30 as bigint), 
'count', 3))), "
+            + "(4, 'd', array(named_struct('value', cast(40 as bigint), 
'count', 4))), "
+            + "(5, 'e', array(named_struct('value', cast(50 as bigint), 
'count', 5))), "
+            + "(6, 'f', array(named_struct('value', cast(60 as bigint), 
'count', 6)))",
+        tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+    sql("DELETE FROM %s WHERE id = 2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+    sql(
+        "CALL %s.system.rewrite_position_delete_files("
+            + "table => '%s',"
+            + "options => map('rewrite-all','true'))",
+        catalogName, tableIdent);
+  }
+
+  @TestTemplate
+  public void testRewritePositionDeletesWithMapColumns() throws Exception {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+            + "USING iceberg TBLPROPERTIES "
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read', 
'write.update.mode'='merge-on-read')",
+        tableName);
+
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(1, 'a', map('x', cast(10 as bigint))), "
+            + "(2, 'b', map('y', cast(20 as bigint))), "
+            + "(3, 'c', map('z', cast(30 as bigint))), "
+            + "(4, 'd', map('w', cast(40 as bigint))), "
+            + "(5, 'e', map('v', cast(50 as bigint))), "
+            + "(6, 'f', map('u', cast(60 as bigint)))",
+        tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+    sql("DELETE FROM %s WHERE id = 2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+    sql(
+        "CALL %s.system.rewrite_position_delete_files("
+            + "table => '%s',"
+            + "options => map('rewrite-all','true'))",
+        catalogName, tableIdent);
+  }
+
   private Map<String, String> snapshotSummary() {
     return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 8ad1f3ad39..9454808f00 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.PositionDeletesScanTask;
@@ -32,7 +30,6 @@ import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
@@ -88,12 +85,16 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
     InputFile inputFile = getInputFile(task.file().location());
     Preconditions.checkNotNull(inputFile, "Could not find InputFile associated 
with %s", task);
 
-    // select out constant fields when pushing down filter to row reader
+    // Retain predicates on non-constant fields for row reader filter
     Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
-    Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+    int[] nonConstantFieldIds =
+        expectedSchema().idToName().keySet().stream()
+            .filter(id -> !idToConstant.containsKey(id))
+            .mapToInt(Integer::intValue)
+            .toArray();
     Expression residualWithoutConstants =
         ExpressionUtil.extractByIdInclusive(
-            task.residual(), expectedSchema(), caseSensitive(), 
Ints.toArray(nonConstantFieldIds));
+            task.residual(), expectedSchema(), caseSensitive(), 
nonConstantFieldIds);
 
     if (ContentFileUtil.isDV(task.file())) {
       return new DVIterator(inputFile, task.file(), expectedSchema(), 
idToConstant);
@@ -109,12 +110,4 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
             idToConstant)
         .iterator();
   }
-
-  private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
-    Set<Integer> fields = expectedSchema().idToName().keySet();
-    return fields.stream()
-        .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
-        .filter(id -> !idToConstant.containsKey(id))
-        .collect(Collectors.toSet());
-  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 81954c19b5..0a5e064ace 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     dropTable(tableName);
   }
 
+  @TestTemplate
+  public void testArrayColumnFilter() throws IOException {
+    assumeThat(formatVersion)
+        .as("Row content in position_deletes is required for array column 
filter test")
+        .isEqualTo(2);
+    String tableName = "array_column_filter";
+    Schema schemaWithArray =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()),
+            Types.NestedField.optional(
+                3, "arr_col", Types.ListType.ofOptional(4, 
Types.IntegerType.get())));
+    Table tab = createTable(tableName, schemaWithArray, 
PartitionSpec.unpartitioned());
+
+    GenericRecord record1 = GenericRecord.create(schemaWithArray);
+    record1.set(0, 1);
+    record1.set(1, "a");
+    record1.set(2, ImmutableList.of(1, 2));
+    GenericRecord record2 = GenericRecord.create(schemaWithArray);
+    record2.set(0, 2);
+    record2.set(1, "b");
+    record2.set(2, ImmutableList.of(3, 4));
+    List<Record> dataRecords = ImmutableList.of(record1, record2);
+    DataFile dFile =
+        FileHelpers.writeDataFile(
+            tab,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(),
+            dataRecords);
+    tab.newAppend().appendFile(dFile).commit();
+
+    List<PositionDelete<?>> deletes =
+        ImmutableList.of(
+            positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a", 
ImmutableList.of(1, 2)),
+            positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b", 
ImmutableList.of(3, 4)));
+    DeleteFile posDeletes =
+        FileHelpers.writePosDeleteFile(
+            tab,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(),
+            deletes,
+            formatVersion);
+    tab.newRowDelta().addDeletes(posDeletes).commit();
+
+    // Filter directly on array column: row.arr_col = array(1, 2)
+    StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+    StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)), 
null, posDeletes);
+
+    assertThat(actual)
+        .as("Filtering position_deletes by row.arr_col = array(1, 2) should 
return matching row")
+        .isEqualTo(expected);
+    dropTable(tableName);
+  }
+
   @TestTemplate
   public void testSelect() throws IOException {
     assumeThat(formatVersion).as("DVs don't have row info in 
PositionDeletesTable").isEqualTo(2);
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index feafaff27b..184693e211 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure 
extends ExtensionsTestBase
             EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("3.5"));
   }
 
+  @TestTemplate
+  public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, items 
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+            + "USING iceberg TBLPROPERTIES "
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read', 
'write.update.mode'='merge-on-read')",
+        tableName);
+
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(1, 'a', array(named_struct('value', cast(10 as bigint), 
'count', 1))), "
+            + "(2, 'b', array(named_struct('value', cast(20 as bigint), 
'count', 2))), "
+            + "(3, 'c', array(named_struct('value', cast(30 as bigint), 
'count', 3))), "
+            + "(4, 'd', array(named_struct('value', cast(40 as bigint), 
'count', 4))), "
+            + "(5, 'e', array(named_struct('value', cast(50 as bigint), 
'count', 5))), "
+            + "(6, 'f', array(named_struct('value', cast(60 as bigint), 
'count', 6)))",
+        tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+    sql("DELETE FROM %s WHERE id = 2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+    sql(
+        "CALL %s.system.rewrite_position_delete_files("
+            + "table => '%s',"
+            + "options => map('rewrite-all','true'))",
+        catalogName, tableIdent);
+  }
+
+  @TestTemplate
+  public void testRewritePositionDeletesWithMapColumns() throws Exception {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+            + "USING iceberg TBLPROPERTIES "
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read', 
'write.update.mode'='merge-on-read')",
+        tableName);
+
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(1, 'a', map('x', cast(10 as bigint))), "
+            + "(2, 'b', map('y', cast(20 as bigint))), "
+            + "(3, 'c', map('z', cast(30 as bigint))), "
+            + "(4, 'd', map('w', cast(40 as bigint))), "
+            + "(5, 'e', map('v', cast(50 as bigint))), "
+            + "(6, 'f', map('u', cast(60 as bigint)))",
+        tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+    sql("DELETE FROM %s WHERE id = 2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+    sql(
+        "CALL %s.system.rewrite_position_delete_files("
+            + "table => '%s',"
+            + "options => map('rewrite-all','true'))",
+        catalogName, tableIdent);
+  }
+
   private Map<String, String> snapshotSummary() {
     return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 7c5969effb..394b3d47b6 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.PositionDeletesScanTask;
@@ -33,7 +31,6 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
@@ -96,12 +93,16 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
     InputFile inputFile = getInputFile(task.file().location());
     Preconditions.checkNotNull(inputFile, "Could not find InputFile associated 
with %s", task);
 
-    // select out constant fields when pushing down filter to row reader
+    // Retain predicates on non-constant fields for row reader filter
     Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
-    Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+    int[] nonConstantFieldIds =
+        expectedSchema().idToName().keySet().stream()
+            .filter(id -> !idToConstant.containsKey(id))
+            .mapToInt(Integer::intValue)
+            .toArray();
     Expression residualWithoutConstants =
         ExpressionUtil.extractByIdInclusive(
-            task.residual(), expectedSchema(), caseSensitive(), 
Ints.toArray(nonConstantFieldIds));
+            task.residual(), expectedSchema(), caseSensitive(), 
nonConstantFieldIds);
 
     if (ContentFileUtil.isDV(task.file())) {
       return new DVIterator(inputFile, task.file(), expectedSchema(), 
idToConstant);
@@ -117,12 +118,4 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
             idToConstant)
         .iterator();
   }
-
-  private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
-    Set<Integer> fields = expectedSchema().idToName().keySet();
-    return fields.stream()
-        .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
-        .filter(id -> !idToConstant.containsKey(id))
-        .collect(Collectors.toSet());
-  }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 8032b0b782..3cfd9c848f 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     dropTable(tableName);
   }
 
+  @TestTemplate
+  public void testArrayColumnFilter() throws IOException {
+    assumeThat(formatVersion)
+        .as("Row content in position_deletes is required for array column 
filter test")
+        .isEqualTo(2);
+    String tableName = "array_column_filter";
+    Schema schemaWithArray =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()),
+            Types.NestedField.optional(
+                3, "arr_col", Types.ListType.ofOptional(4, 
Types.IntegerType.get())));
+    Table tab = createTable(tableName, schemaWithArray, 
PartitionSpec.unpartitioned());
+
+    GenericRecord record1 = GenericRecord.create(schemaWithArray);
+    record1.set(0, 1);
+    record1.set(1, "a");
+    record1.set(2, ImmutableList.of(1, 2));
+    GenericRecord record2 = GenericRecord.create(schemaWithArray);
+    record2.set(0, 2);
+    record2.set(1, "b");
+    record2.set(2, ImmutableList.of(3, 4));
+    List<Record> dataRecords = ImmutableList.of(record1, record2);
+    DataFile dFile =
+        FileHelpers.writeDataFile(
+            tab,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(),
+            dataRecords);
+    tab.newAppend().appendFile(dFile).commit();
+
+    List<PositionDelete<?>> deletes =
+        ImmutableList.of(
+            positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a", 
ImmutableList.of(1, 2)),
+            positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b", 
ImmutableList.of(3, 4)));
+    DeleteFile posDeletes =
+        FileHelpers.writePosDeleteFile(
+            tab,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(),
+            deletes,
+            formatVersion);
+    tab.newRowDelta().addDeletes(posDeletes).commit();
+
+    // Filter directly on array column: row.arr_col = array(1, 2)
+    StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+    StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)), 
null, posDeletes);
+
+    assertThat(actual)
+        .as("Filtering position_deletes by row.arr_col = array(1, 2) should 
return matching row")
+        .isEqualTo(expected);
+    dropTable(tableName);
+  }
+
   @TestTemplate
   public void testSelect() throws IOException {
     assumeThat(formatVersion).as("DVs don't have row info in 
PositionDeletesTable").isEqualTo(2);
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 006379adda..d5e5aecc75 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure 
extends ExtensionsTestBase
             EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("4.0"));
   }
 
+  @TestTemplate
+  public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, items 
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+            + "USING iceberg TBLPROPERTIES "
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read', 
'write.update.mode'='merge-on-read')",
+        tableName);
+
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(1, 'a', array(named_struct('value', cast(10 as bigint), 
'count', 1))), "
+            + "(2, 'b', array(named_struct('value', cast(20 as bigint), 
'count', 2))), "
+            + "(3, 'c', array(named_struct('value', cast(30 as bigint), 
'count', 3))), "
+            + "(4, 'd', array(named_struct('value', cast(40 as bigint), 
'count', 4))), "
+            + "(5, 'e', array(named_struct('value', cast(50 as bigint), 
'count', 5))), "
+            + "(6, 'f', array(named_struct('value', cast(60 as bigint), 
'count', 6)))",
+        tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+    sql("DELETE FROM %s WHERE id = 2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+    sql(
+        "CALL %s.system.rewrite_position_delete_files("
+            + "table => '%s',"
+            + "options => map('rewrite-all','true'))",
+        catalogName, tableIdent);
+  }
+
+  @TestTemplate
+  public void testRewritePositionDeletesWithMapColumns() throws Exception {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+            + "USING iceberg TBLPROPERTIES "
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read', 
'write.update.mode'='merge-on-read')",
+        tableName);
+
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(1, 'a', map('x', cast(10 as bigint))), "
+            + "(2, 'b', map('y', cast(20 as bigint))), "
+            + "(3, 'c', map('z', cast(30 as bigint))), "
+            + "(4, 'd', map('w', cast(40 as bigint))), "
+            + "(5, 'e', map('v', cast(50 as bigint))), "
+            + "(6, 'f', map('u', cast(60 as bigint)))",
+        tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+    sql("DELETE FROM %s WHERE id = 2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+    sql(
+        "CALL %s.system.rewrite_position_delete_files("
+            + "table => '%s',"
+            + "options => map('rewrite-all','true'))",
+        catalogName, tableIdent);
+  }
+
   private Map<String, String> snapshotSummary() {
     return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
   }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 7c5969effb..394b3d47b6 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.PositionDeletesScanTask;
@@ -33,7 +31,6 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
@@ -96,12 +93,16 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
     InputFile inputFile = getInputFile(task.file().location());
     Preconditions.checkNotNull(inputFile, "Could not find InputFile associated 
with %s", task);
 
-    // select out constant fields when pushing down filter to row reader
+    // Retain predicates on non-constant fields for row reader filter
     Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
-    Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+    int[] nonConstantFieldIds =
+        expectedSchema().idToName().keySet().stream()
+            .filter(id -> !idToConstant.containsKey(id))
+            .mapToInt(Integer::intValue)
+            .toArray();
     Expression residualWithoutConstants =
         ExpressionUtil.extractByIdInclusive(
-            task.residual(), expectedSchema(), caseSensitive(), 
Ints.toArray(nonConstantFieldIds));
+            task.residual(), expectedSchema(), caseSensitive(), 
nonConstantFieldIds);
 
     if (ContentFileUtil.isDV(task.file())) {
       return new DVIterator(inputFile, task.file(), expectedSchema(), 
idToConstant);
@@ -117,12 +118,4 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
             idToConstant)
         .iterator();
   }
-
-  private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
-    Set<Integer> fields = expectedSchema().idToName().keySet();
-    return fields.stream()
-        .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
-        .filter(id -> !idToConstant.containsKey(id))
-        .collect(Collectors.toSet());
-  }
 }
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index f5456db8e4..f9a2fea30c 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     dropTable(tableName);
   }
 
+  @TestTemplate
+  public void testArrayColumnFilter() throws IOException {
+    assumeThat(formatVersion)
+        .as("Row content in position_deletes is required for array column 
filter test")
+        .isEqualTo(2);
+    String tableName = "array_column_filter";
+    Schema schemaWithArray =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()),
+            Types.NestedField.optional(
+                3, "arr_col", Types.ListType.ofOptional(4, 
Types.IntegerType.get())));
+    Table tab = createTable(tableName, schemaWithArray, 
PartitionSpec.unpartitioned());
+
+    GenericRecord record1 = GenericRecord.create(schemaWithArray);
+    record1.set(0, 1);
+    record1.set(1, "a");
+    record1.set(2, ImmutableList.of(1, 2));
+    GenericRecord record2 = GenericRecord.create(schemaWithArray);
+    record2.set(0, 2);
+    record2.set(1, "b");
+    record2.set(2, ImmutableList.of(3, 4));
+    List<Record> dataRecords = ImmutableList.of(record1, record2);
+    DataFile dFile =
+        FileHelpers.writeDataFile(
+            tab,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(),
+            dataRecords);
+    tab.newAppend().appendFile(dFile).commit();
+
+    List<PositionDelete<?>> deletes =
+        ImmutableList.of(
+            positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a", 
ImmutableList.of(1, 2)),
+            positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b", 
ImmutableList.of(3, 4)));
+    DeleteFile posDeletes =
+        FileHelpers.writePosDeleteFile(
+            tab,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(),
+            deletes,
+            formatVersion);
+    tab.newRowDelta().addDeletes(posDeletes).commit();
+
+    // Filter directly on array column: row.arr_col = array(1, 2)
+    StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+    StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)), 
null, posDeletes);
+
+    assertThat(actual)
+        .as("Filtering position_deletes by row.arr_col = array(1, 2) should 
return matching row")
+        .isEqualTo(expected);
+    dropTable(tableName);
+  }
+
   @TestTemplate
   public void testSelect() throws IOException {
     assumeThat(formatVersion).as("DVs don't have row info in 
PositionDeletesTable").isEqualTo(2);

Reply via email to