This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 60a6e7324c Core: Fix rewrite_position_delete_files failure with
array/map columns (#15632)
60a6e7324c is described below
commit 60a6e7324c4b6b234882f9d464c9ddf6cbcb6bd3
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Mar 23 11:50:56 2026 -0700
Core: Fix rewrite_position_delete_files failure with array/map columns
(#15632)
---
.../apache/iceberg/expressions/ExpressionUtil.java | 93 +++++++++---
.../iceberg/expressions/TestExpressionUtil.java | 158 +++++++++++++++++++++
.../TestRewritePositionDeleteFilesProcedure.java | 62 ++++++++
.../spark/source/PositionDeletesRowReader.java | 21 +--
.../spark/source/TestPositionDeletesTable.java | 54 +++++++
5 files changed, 354 insertions(+), 34 deletions(-)
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index 9502e1d5bd..af24ce40ca 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -24,6 +24,7 @@ import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
@@ -145,21 +146,28 @@ public class ExpressionUtil {
}
/**
- * Extracts an expression that references only the given column IDs from the
given expression.
+ * Returns an expression that retains only predicates which reference one of
the given field IDs.
*
- * <p>The result is inclusive. If a row would match the original filter, it
must match the result
- * filter.
- *
- * @param expression a filter Expression
- * @param schema a Schema
+ * @param expression a filter expression
+ * @param schema schema for binding references
* @param caseSensitive whether binding is case sensitive
- * @param ids field IDs used to match predicates to extract from the
expression
- * @return an Expression that selects at least the same rows as the original
using only the IDs
+ * @param ids field IDs to retain predicates for
+ * @return expression containing only predicates that reference the given IDs
*/
public static Expression extractByIdInclusive(
Expression expression, Schema schema, boolean caseSensitive, int... ids)
{
- PartitionSpec spec = identitySpec(schema, ids);
- return Projections.inclusive(spec,
caseSensitive).project(Expressions.rewriteNot(expression));
+ if (ids == null || ids.length == 0) {
+ return Expressions.alwaysTrue();
+ }
+
+ ImmutableSet.Builder<Integer> retainIds = ImmutableSet.builder();
+ for (int id : ids) {
+ retainIds.add(id);
+ }
+
+ return ExpressionVisitors.visit(
+ Expressions.rewriteNot(expression),
+ new RetainPredicatesByFieldIdVisitor(schema, caseSensitive,
retainIds.build()));
}
/**
@@ -262,6 +270,61 @@ public class ExpressionUtil {
throw new UnsupportedOperationException("Cannot unbind unsupported term: "
+ term);
}
+ private static class RetainPredicatesByFieldIdVisitor
+ extends ExpressionVisitors.ExpressionVisitor<Expression> {
+ private final Schema schema;
+ private final boolean caseSensitive;
+ private final Set<Integer> retainFieldIds;
+
+ RetainPredicatesByFieldIdVisitor(
+ Schema schema, boolean caseSensitive, Set<Integer> retainFieldIds) {
+ this.schema = schema;
+ this.caseSensitive = caseSensitive;
+ this.retainFieldIds = retainFieldIds;
+ }
+
+ @Override
+ public Expression alwaysTrue() {
+ return Expressions.alwaysTrue();
+ }
+
+ @Override
+ public Expression alwaysFalse() {
+ return Expressions.alwaysFalse();
+ }
+
+ @Override
+ public Expression not(Expression result) {
+ return Expressions.not(result);
+ }
+
+ @Override
+ public Expression and(Expression leftResult, Expression rightResult) {
+ return Expressions.and(leftResult, rightResult);
+ }
+
+ @Override
+ public Expression or(Expression leftResult, Expression rightResult) {
+ return Expressions.or(leftResult, rightResult);
+ }
+
+ @Override
+ public <T> Expression predicate(BoundPredicate<T> pred) {
+ return retainFieldIds.contains(pred.ref().fieldId()) ? pred :
Expressions.alwaysTrue();
+ }
+
+ @Override
+ public <T> Expression predicate(UnboundPredicate<T> pred) {
+ Expression bound = Binder.bind(schema.asStruct(), pred, caseSensitive);
+ if (bound instanceof BoundPredicate) {
+ return retainFieldIds.contains(((BoundPredicate<?>)
bound).ref().fieldId())
+ ? pred
+ : Expressions.alwaysTrue();
+ }
+ return Expressions.alwaysTrue();
+ }
+ }
+
private static class ExpressionSanitizer
extends ExpressionVisitors.ExpressionVisitor<Expression> {
private final long now;
@@ -697,14 +760,4 @@ public class ExpressionUtil {
}
return builder.toString();
}
-
- private static PartitionSpec identitySpec(Schema schema, int... ids) {
- PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
-
- for (int id : ids) {
- specBuilder.identity(schema.findColumnName(id));
- }
-
- return specBuilder.build();
- }
}
diff --git
a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
index a528142188..fdf3d9dcd1 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
@@ -69,6 +69,24 @@ public class TestExpressionUtil {
private static final Types.StructType FLOAT_TEST =
Types.StructType.of(Types.NestedField.optional(1, "test",
Types.FloatType.get()));
+ /** Schema with struct, list, and map columns for {@link
#testExtractByIdInclusiveNestedTypes}. */
+ private static final Schema NESTED_EXTRACT_SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "top_id", Types.LongType.get()),
+ Types.NestedField.optional(
+ 2,
+ "st",
+ Types.StructType.of(
+ Types.NestedField.required(3, "inner_i",
Types.IntegerType.get()))),
+ Types.NestedField.optional(
+ 4, "arr", Types.ListType.ofRequired(5, Types.IntegerType.get())),
+ Types.NestedField.optional(
+ 6,
+ "mp",
+ Types.MapType.ofRequired(7, 8, Types.StringType.get(),
Types.IntegerType.get())));
+
+ private static final Types.StructType NESTED_EXTRACT_STRUCT =
NESTED_EXTRACT_SCHEMA.asStruct();
+
@Test
public void testUnchangedUnaryPredicates() {
for (Expression unary :
@@ -825,6 +843,146 @@ public class TestExpressionUtil {
}
}
+ @Test
+ public void testExtractByIdInclusive() {
+ Expression alwaysTrue = Expressions.alwaysTrue();
+ Expression idEq = Expressions.equal("id", 5L);
+ Expression valEq = Expressions.equal("val", 5);
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ alwaysTrue,
+ ExpressionUtil.extractByIdInclusive(
+ Expressions.and(idEq, valEq), SCHEMA, true, new int[0]),
+ STRUCT,
+ true))
+ .isTrue();
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ alwaysTrue,
+ ExpressionUtil.extractByIdInclusive(
+ Expressions.and(idEq, valEq), SCHEMA, true, (int[]) null),
+ STRUCT,
+ true))
+ .isTrue();
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ idEq, ExpressionUtil.extractByIdInclusive(idEq, SCHEMA, true,
1), STRUCT, true))
+ .isTrue();
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ alwaysTrue,
+ ExpressionUtil.extractByIdInclusive(valEq, SCHEMA, true, 1),
+ STRUCT,
+ true))
+ .isTrue();
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ idEq,
+ ExpressionUtil.extractByIdInclusive(Expressions.and(idEq,
valEq), SCHEMA, true, 1),
+ STRUCT,
+ true))
+ .isTrue();
+
+ Expression orBothId = Expressions.or(Expressions.equal("id", 1L),
Expressions.equal("id", 2L));
+ assertThat(
+ ExpressionUtil.equivalent(
+ orBothId,
+ ExpressionUtil.extractByIdInclusive(orBothId, SCHEMA, true, 1),
+ STRUCT,
+ true))
+ .isTrue();
+ }
+
+ @Test
+ public void testExtractByIdInclusiveNestedTypes() {
+ Expression alwaysTrue = Expressions.alwaysTrue();
+ Expression structPred = Expressions.equal("st.inner_i", 1);
+ Expression listPred = Expressions.equal("arr.element", 42);
+ Expression mapKeyPred = Expressions.equal("mp.key", "k");
+ Expression mapValuePred = Expressions.equal("mp.value", 7);
+ Expression topPred = Expressions.equal("top_id", 9L);
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ structPred,
+ ExpressionUtil.extractByIdInclusive(structPred,
NESTED_EXTRACT_SCHEMA, true, 3),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ assertThat(
+ ExpressionUtil.equivalent(
+ alwaysTrue,
+ ExpressionUtil.extractByIdInclusive(structPred,
NESTED_EXTRACT_SCHEMA, true, 1),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ listPred,
+ ExpressionUtil.extractByIdInclusive(listPred,
NESTED_EXTRACT_SCHEMA, true, 5),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ assertThat(
+ ExpressionUtil.equivalent(
+ alwaysTrue,
+ ExpressionUtil.extractByIdInclusive(listPred,
NESTED_EXTRACT_SCHEMA, true, 1),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+
+ assertThat(
+ ExpressionUtil.equivalent(
+ mapKeyPred,
+ ExpressionUtil.extractByIdInclusive(mapKeyPred,
NESTED_EXTRACT_SCHEMA, true, 7),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ assertThat(
+ ExpressionUtil.equivalent(
+ mapValuePred,
+ ExpressionUtil.extractByIdInclusive(mapValuePred,
NESTED_EXTRACT_SCHEMA, true, 8),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ assertThat(
+ ExpressionUtil.equivalent(
+ alwaysTrue,
+ ExpressionUtil.extractByIdInclusive(mapKeyPred,
NESTED_EXTRACT_SCHEMA, true, 8),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+
+ Expression mixed = Expressions.and(structPred, Expressions.and(listPred,
topPred));
+ assertThat(
+ ExpressionUtil.equivalent(
+ structPred,
+ ExpressionUtil.extractByIdInclusive(mixed,
NESTED_EXTRACT_SCHEMA, true, 3),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ assertThat(
+ ExpressionUtil.equivalent(
+ listPred,
+ ExpressionUtil.extractByIdInclusive(mixed,
NESTED_EXTRACT_SCHEMA, true, 5),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ assertThat(
+ ExpressionUtil.equivalent(
+ topPred,
+ ExpressionUtil.extractByIdInclusive(mixed,
NESTED_EXTRACT_SCHEMA, true, 1),
+ NESTED_EXTRACT_STRUCT,
+ true))
+ .isTrue();
+ }
+
@Test
public void testIdenticalExpressionIsEquivalent() {
Expression[] exprs =
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 311cf763ee..6c744c8df4 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -245,6 +245,68 @@ public class TestRewritePositionDeleteFilesProcedure
extends ExtensionsTestBase
EnvironmentContext.ENGINE_VERSION, v ->
assertThat(v).startsWith("4.1"));
}
+ @TestTemplate
+ public void testRewritePositionDeletesWithArrayColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, items
ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', array(named_struct('value', cast(10 as bigint),
'count', 1))), "
+ + "(2, 'b', array(named_struct('value', cast(20 as bigint),
'count', 2))), "
+ + "(3, 'c', array(named_struct('value', cast(30 as bigint),
'count', 3))), "
+ + "(4, 'd', array(named_struct('value', cast(40 as bigint),
'count', 4))), "
+ + "(5, 'e', array(named_struct('value', cast(50 as bigint),
'count', 5))), "
+ + "(6, 'f', array(named_struct('value', cast(60 as bigint),
'count', 6)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
+ @TestTemplate
+ public void testRewritePositionDeletesWithMapColumns() throws Exception {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+ + "USING iceberg TBLPROPERTIES "
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'a', map('x', cast(10 as bigint))), "
+ + "(2, 'b', map('y', cast(20 as bigint))), "
+ + "(3, 'c', map('z', cast(30 as bigint))), "
+ + "(4, 'd', map('w', cast(40 as bigint))), "
+ + "(5, 'e', map('v', cast(50 as bigint))), "
+ + "(6, 'f', map('u', cast(60 as bigint)))",
+ tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", tableName);
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
+
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ + "options => map('rewrite-all','true'))",
+ catalogName, tableIdent);
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index b14970722e..c7abf219db 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -19,8 +19,6 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.PositionDeletesScanTask;
@@ -33,7 +31,6 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -86,12 +83,16 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
InputFile inputFile = getInputFile(task.file().location());
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated
with %s", task);
- // select out constant fields when pushing down filter to row reader
+ // Retain predicates on non-constant fields for row reader filter
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
- Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+ int[] nonConstantFieldIds =
+ expectedSchema().idToName().keySet().stream()
+ .filter(id -> !idToConstant.containsKey(id))
+ .mapToInt(Integer::intValue)
+ .toArray();
Expression residualWithoutConstants =
ExpressionUtil.extractByIdInclusive(
- task.residual(), expectedSchema(), caseSensitive(),
Ints.toArray(nonConstantFieldIds));
+ task.residual(), expectedSchema(), caseSensitive(),
nonConstantFieldIds);
if (ContentFileUtil.isDV(task.file())) {
return new DVIterator(inputFile, task.file(), expectedSchema(),
idToConstant);
@@ -107,12 +108,4 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
idToConstant)
.iterator();
}
-
- private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
- Set<Integer> fields = expectedSchema().idToName().keySet();
- return fields.stream()
- .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
- .filter(id -> !idToConstant.containsKey(id))
- .collect(Collectors.toSet());
- }
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 0e77e70e69..14ad107e50 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -209,6 +209,60 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
dropTable(tableName);
}
+ @TestTemplate
+ public void testArrayColumnFilter() throws IOException {
+ assumeThat(formatVersion)
+ .as("Row content in position_deletes is required for array column
filter test")
+ .isEqualTo(2);
+ String tableName = "array_column_filter";
+ Schema schemaWithArray =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(
+ 3, "arr_col", Types.ListType.ofOptional(4,
Types.IntegerType.get())));
+ Table tab = createTable(tableName, schemaWithArray,
PartitionSpec.unpartitioned());
+
+ GenericRecord record1 = GenericRecord.create(schemaWithArray);
+ record1.set(0, 1);
+ record1.set(1, "a");
+ record1.set(2, ImmutableList.of(1, 2));
+ GenericRecord record2 = GenericRecord.create(schemaWithArray);
+ record2.set(0, 2);
+ record2.set(1, "b");
+ record2.set(2, ImmutableList.of(3, 4));
+ List<Record> dataRecords = ImmutableList.of(record1, record2);
+ DataFile dFile =
+ FileHelpers.writeDataFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ dataRecords);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<PositionDelete<?>> deletes =
+ ImmutableList.of(
+ positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a",
ImmutableList.of(1, 2)),
+ positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b",
ImmutableList.of(3, 4)));
+ DeleteFile posDeletes =
+ FileHelpers.writePosDeleteFile(
+ tab,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(),
+ deletes,
+ formatVersion);
+ tab.newRowDelta().addDeletes(posDeletes).commit();
+
+ // Filter directly on array column: row.arr_col = array(1, 2)
+ StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
+ StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)),
null, posDeletes);
+
+ assertThat(actual)
+ .as("Filtering position_deletes by row.arr_col = array(1, 2) should
return matching row")
+ .isEqualTo(expected);
+ dropTable(tableName);
+ }
+
@TestTemplate
public void testSelect() throws IOException {
assumeThat(formatVersion).as("DVs don't have row info in
PositionDeletesTable").isEqualTo(2);