This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 32b2f00125 Spark: Replicate position delete array/map fix to Spark
3.4, 3.5, and 4.0 (#15743)
32b2f00125 is described below
commit 32b2f00125261b93c29aca9068d78390135a80f1
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Mar 23 14:32:39 2026 -0700
Spark: Replicate position delete array/map fix to Spark 3.4, 3.5, and 4.0
(#15743)
Follow-up to #15632, which updated Spark 4.1 only. Port the same
PositionDeletesRowReader residual extraction (retain all non-constant field IDs
for extractByIdInclusive, not only primitives) and the matching
rewrite_position_delete_files and position_deletes metadata tests to the Spark
3.4, 3.5, and 4.0 modules.
---
.../TestRewritePositionDeleteFilesProcedure.java | 62 ++++++++++++++++++++++
.../spark/source/PositionDeletesRowReader.java | 21 +++-----
.../spark/source/TestPositionDeletesTable.java | 54 +++++++++++++++++++
.../TestRewritePositionDeleteFilesProcedure.java | 62 ++++++++++++++++++++++
.../spark/source/PositionDeletesRowReader.java | 21 +++-----
.../spark/source/TestPositionDeletesTable.java | 54 +++++++++++++++++++
.../TestRewritePositionDeleteFilesProcedure.java | 62 ++++++++++++++++++++++
.../spark/source/PositionDeletesRowReader.java | 21 +++-----
.../spark/source/TestPositionDeletesTable.java | 54 +++++++++++++++++++
9 files changed, 369 insertions(+), 42 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0ff3a949ae..e5daaf65d7 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure
extends ExtensionsTestBase
EnvironmentContext.ENGINE_VERSION, v ->
assertThat(v).startsWith("3.4"));
}
+ @TestTemplate
+ public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, items
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', array(named_struct('value', cast(10 as bigint),
'count', 1))), "
+ + "(2, 'b', array(named_struct('value', cast(20 as bigint),
'count', 2))), "
+ + "(3, 'c', array(named_struct('value', cast(30 as bigint),
'count', 3))), "
+ + "(4, 'd', array(named_struct('value', cast(40 as bigint),
'count', 4))), "
+ + "(5, 'e', array(named_struct('value', cast(50 as bigint),
'count', 5))), "
+ + "(6, 'f', array(named_struct('value', cast(60 as bigint),
'count', 6)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
+ @TestTemplate
+ public void testRewritePositionDeletesWithMapColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', map('x', cast(10 as bigint))), "
+ + "(2, 'b', map('y', cast(20 as bigint))), "
+ + "(3, 'c', map('z', cast(30 as bigint))), "
+ + "(4, 'd', map('w', cast(40 as bigint))), "
+ + "(5, 'e', map('v', cast(50 as bigint))), "
+ + "(6, 'f', map('u', cast(60 as bigint)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 8ad1f3ad39..9454808f00 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.PositionDeletesScanTask;
@@ -32,7 +30,6 @@ import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -88,12 +85,16 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
InputFile inputFile = getInputFile(task.file().location());
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated
with %s", task);
- // select out constant fields when pushing down filter to row reader
+ // Retain predicates on non-constant fields for row reader filter
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
- Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+ int[] nonConstantFieldIds =
+ expectedSchema().idToName().keySet().stream()
+ .filter(id -> !idToConstant.containsKey(id))
+ .mapToInt(Integer::intValue)
+ .toArray();
Expression residualWithoutConstants =
ExpressionUtil.extractByIdInclusive(
- task.residual(), expectedSchema(), caseSensitive(),
Ints.toArray(nonConstantFieldIds));
+ task.residual(), expectedSchema(), caseSensitive(),
nonConstantFieldIds);
if (ContentFileUtil.isDV(task.file())) {
return new DVIterator(inputFile, task.file(), expectedSchema(),
idToConstant);
@@ -109,12 +110,4 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
idToConstant)
.iterator();
}
-
- private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
- Set<Integer> fields = expectedSchema().idToName().keySet();
- return fields.stream()
- .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
- .filter(id -> !idToConstant.containsKey(id))
- .collect(Collectors.toSet());
- }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 81954c19b5..0a5e064ace 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
dropTable(tableName);
}
+ @TestTemplate
+ public void testArrayColumnFilter() throws IOException {
+ assumeThat(formatVersion)
+ .as("Row content in position_deletes is required for array column
filter test")
+ .isEqualTo(2);
+ String tableName = "array_column_filter";
+ Schema schemaWithArray =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(
+ 3, "arr_col", Types.ListType.ofOptional(4,
Types.IntegerType.get())));
+ Table tab = createTable(tableName, schemaWithArray,
PartitionSpec.unpartitioned());
+
+ GenericRecord record1 = GenericRecord.create(schemaWithArray);
+ record1.set(0, 1);
+ record1.set(1, "a");
+ record1.set(2, ImmutableList.of(1, 2));
+ GenericRecord record2 = GenericRecord.create(schemaWithArray);
+ record2.set(0, 2);
+ record2.set(1, "b");
+ record2.set(2, ImmutableList.of(3, 4));
+ List<Record> dataRecords = ImmutableList.of(record1, record2);
+ DataFile dFile =
+ FileHelpers.writeDataFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ dataRecords);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<PositionDelete<?>> deletes =
+ ImmutableList.of(
+ positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a",
ImmutableList.of(1, 2)),
+ positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b",
ImmutableList.of(3, 4)));
+ DeleteFile posDeletes =
+ FileHelpers.writePosDeleteFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ deletes,
+ formatVersion);
+ tab.newRowDelta().addDeletes(posDeletes).commit();
+
+ // Filter directly on array column: row.arr_col = array(1, 2)
+ StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+ StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)),
null, posDeletes);
+
+ assertThat(actual)
+ .as("Filtering position_deletes by row.arr_col = array(1, 2) should
return matching row")
+ .isEqualTo(expected);
+ dropTable(tableName);
+ }
+
@TestTemplate
public void testSelect() throws IOException {
assumeThat(formatVersion).as("DVs don't have row info in
PositionDeletesTable").isEqualTo(2);
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index feafaff27b..184693e211 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure
extends ExtensionsTestBase
EnvironmentContext.ENGINE_VERSION, v ->
assertThat(v).startsWith("3.5"));
}
+ @TestTemplate
+ public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, items
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', array(named_struct('value', cast(10 as bigint),
'count', 1))), "
+ + "(2, 'b', array(named_struct('value', cast(20 as bigint),
'count', 2))), "
+ + "(3, 'c', array(named_struct('value', cast(30 as bigint),
'count', 3))), "
+ + "(4, 'd', array(named_struct('value', cast(40 as bigint),
'count', 4))), "
+ + "(5, 'e', array(named_struct('value', cast(50 as bigint),
'count', 5))), "
+ + "(6, 'f', array(named_struct('value', cast(60 as bigint),
'count', 6)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
+ @TestTemplate
+ public void testRewritePositionDeletesWithMapColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', map('x', cast(10 as bigint))), "
+ + "(2, 'b', map('y', cast(20 as bigint))), "
+ + "(3, 'c', map('z', cast(30 as bigint))), "
+ + "(4, 'd', map('w', cast(40 as bigint))), "
+ + "(5, 'e', map('v', cast(50 as bigint))), "
+ + "(6, 'f', map('u', cast(60 as bigint)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 7c5969effb..394b3d47b6 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.PositionDeletesScanTask;
@@ -33,7 +31,6 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -96,12 +93,16 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
InputFile inputFile = getInputFile(task.file().location());
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated
with %s", task);
- // select out constant fields when pushing down filter to row reader
+ // Retain predicates on non-constant fields for row reader filter
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
- Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+ int[] nonConstantFieldIds =
+ expectedSchema().idToName().keySet().stream()
+ .filter(id -> !idToConstant.containsKey(id))
+ .mapToInt(Integer::intValue)
+ .toArray();
Expression residualWithoutConstants =
ExpressionUtil.extractByIdInclusive(
- task.residual(), expectedSchema(), caseSensitive(),
Ints.toArray(nonConstantFieldIds));
+ task.residual(), expectedSchema(), caseSensitive(),
nonConstantFieldIds);
if (ContentFileUtil.isDV(task.file())) {
return new DVIterator(inputFile, task.file(), expectedSchema(),
idToConstant);
@@ -117,12 +118,4 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
idToConstant)
.iterator();
}
-
- private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
- Set<Integer> fields = expectedSchema().idToName().keySet();
- return fields.stream()
- .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
- .filter(id -> !idToConstant.containsKey(id))
- .collect(Collectors.toSet());
- }
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 8032b0b782..3cfd9c848f 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
dropTable(tableName);
}
+ @TestTemplate
+ public void testArrayColumnFilter() throws IOException {
+ assumeThat(formatVersion)
+ .as("Row content in position_deletes is required for array column
filter test")
+ .isEqualTo(2);
+ String tableName = "array_column_filter";
+ Schema schemaWithArray =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(
+ 3, "arr_col", Types.ListType.ofOptional(4,
Types.IntegerType.get())));
+ Table tab = createTable(tableName, schemaWithArray,
PartitionSpec.unpartitioned());
+
+ GenericRecord record1 = GenericRecord.create(schemaWithArray);
+ record1.set(0, 1);
+ record1.set(1, "a");
+ record1.set(2, ImmutableList.of(1, 2));
+ GenericRecord record2 = GenericRecord.create(schemaWithArray);
+ record2.set(0, 2);
+ record2.set(1, "b");
+ record2.set(2, ImmutableList.of(3, 4));
+ List<Record> dataRecords = ImmutableList.of(record1, record2);
+ DataFile dFile =
+ FileHelpers.writeDataFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ dataRecords);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<PositionDelete<?>> deletes =
+ ImmutableList.of(
+ positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a",
ImmutableList.of(1, 2)),
+ positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b",
ImmutableList.of(3, 4)));
+ DeleteFile posDeletes =
+ FileHelpers.writePosDeleteFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ deletes,
+ formatVersion);
+ tab.newRowDelta().addDeletes(posDeletes).commit();
+
+ // Filter directly on array column: row.arr_col = array(1, 2)
+ StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+ StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)),
null, posDeletes);
+
+ assertThat(actual)
+ .as("Filtering position_deletes by row.arr_col = array(1, 2) should
return matching row")
+ .isEqualTo(expected);
+ dropTable(tableName);
+ }
+
@TestTemplate
public void testSelect() throws IOException {
assumeThat(formatVersion).as("DVs don't have row info in
PositionDeletesTable").isEqualTo(2);
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 006379adda..d5e5aecc75 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure
extends ExtensionsTestBase
EnvironmentContext.ENGINE_VERSION, v ->
assertThat(v).startsWith("4.0"));
}
+ @TestTemplate
+ public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, items
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', array(named_struct('value', cast(10 as bigint),
'count', 1))), "
+ + "(2, 'b', array(named_struct('value', cast(20 as bigint),
'count', 2))), "
+ + "(3, 'c', array(named_struct('value', cast(30 as bigint),
'count', 3))), "
+ + "(4, 'd', array(named_struct('value', cast(40 as bigint),
'count', 4))), "
+ + "(5, 'e', array(named_struct('value', cast(50 as bigint),
'count', 5))), "
+ + "(6, 'f', array(named_struct('value', cast(60 as bigint),
'count', 6)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
+ @TestTemplate
+ public void testRewritePositionDeletesWithMapColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', map('x', cast(10 as bigint))), "
+ + "(2, 'b', map('y', cast(20 as bigint))), "
+ + "(3, 'c', map('z', cast(30 as bigint))), "
+ + "(4, 'd', map('w', cast(40 as bigint))), "
+ + "(5, 'e', map('v', cast(50 as bigint))), "
+ + "(6, 'f', map('u', cast(60 as bigint)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 7c5969effb..394b3d47b6 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.PositionDeletesScanTask;
@@ -33,7 +31,6 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -96,12 +93,16 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
InputFile inputFile = getInputFile(task.file().location());
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated
with %s", task);
- // select out constant fields when pushing down filter to row reader
+ // Retain predicates on non-constant fields for row reader filter
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
- Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+ int[] nonConstantFieldIds =
+ expectedSchema().idToName().keySet().stream()
+ .filter(id -> !idToConstant.containsKey(id))
+ .mapToInt(Integer::intValue)
+ .toArray();
Expression residualWithoutConstants =
ExpressionUtil.extractByIdInclusive(
- task.residual(), expectedSchema(), caseSensitive(),
Ints.toArray(nonConstantFieldIds));
+ task.residual(), expectedSchema(), caseSensitive(),
nonConstantFieldIds);
if (ContentFileUtil.isDV(task.file())) {
return new DVIterator(inputFile, task.file(), expectedSchema(),
idToConstant);
@@ -117,12 +118,4 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
idToConstant)
.iterator();
}
-
- private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
- Set<Integer> fields = expectedSchema().idToName().keySet();
- return fields.stream()
- .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
- .filter(id -> !idToConstant.containsKey(id))
- .collect(Collectors.toSet());
- }
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index f5456db8e4..f9a2fea30c 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
dropTable(tableName);
}
+ @TestTemplate
+ public void testArrayColumnFilter() throws IOException {
+ assumeThat(formatVersion)
+ .as("Row content in position_deletes is required for array column
filter test")
+ .isEqualTo(2);
+ String tableName = "array_column_filter";
+ Schema schemaWithArray =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(
+ 3, "arr_col", Types.ListType.ofOptional(4,
Types.IntegerType.get())));
+ Table tab = createTable(tableName, schemaWithArray,
PartitionSpec.unpartitioned());
+
+ GenericRecord record1 = GenericRecord.create(schemaWithArray);
+ record1.set(0, 1);
+ record1.set(1, "a");
+ record1.set(2, ImmutableList.of(1, 2));
+ GenericRecord record2 = GenericRecord.create(schemaWithArray);
+ record2.set(0, 2);
+ record2.set(1, "b");
+ record2.set(2, ImmutableList.of(3, 4));
+ List<Record> dataRecords = ImmutableList.of(record1, record2);
+ DataFile dFile =
+ FileHelpers.writeDataFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ dataRecords);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<PositionDelete<?>> deletes =
+ ImmutableList.of(
+ positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a",
ImmutableList.of(1, 2)),
+ positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b",
ImmutableList.of(3, 4)));
+ DeleteFile posDeletes =
+ FileHelpers.writePosDeleteFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ deletes,
+ formatVersion);
+ tab.newRowDelta().addDeletes(posDeletes).commit();
+
+ // Filter directly on array column: row.arr_col = array(1, 2)
+ StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+ StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)),
null, posDeletes);
+
+ assertThat(actual)
+ .as("Filtering position_deletes by row.arr_col = array(1, 2) should
return matching row")
+ .isEqualTo(expected);
+ dropTable(tableName);
+ }
+
@TestTemplate
public void testSelect() throws IOException {
assumeThat(formatVersion).as("DVs don't have row info in
PositionDeletesTable").isEqualTo(2);