This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bc8f264be1 Spark: Backport migrate SparkCopyOnWriteScan to
SupportsRuntimeV2Filtering (#16303)
bc8f264be1 is described below
commit bc8f264be1a77f5979616d582fa121c681b87ca7
Author: drexler-sky <[email protected]>
AuthorDate: Wed May 13 23:30:33 2026 -0700
Spark: Backport migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering
(#16303)
* Spark: Backport migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering
* trigger build
* Update
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
* Update
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
* Update
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
* add back import
* spark 3.4: SupportsRuntimeFiltering -> SupportsRuntimeV2Filtering
---------
Co-authored-by: Kevin Liu <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
---
.../RowLevelCommandDynamicPruning.scala | 4 +-
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
4 files changed, 116 insertions(+), 50 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
index f8acef9fe3..6766ad338b 100644
---
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
+++
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
@@ -45,7 +45,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.UpdateIcebergTable
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.trees.TreePattern.SORT
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits
@@ -67,7 +67,7 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession)
// apply special dynamic filtering only for plans that don't support deltas
case RewrittenRowLevelCommand(
command: RowLevelCommand,
- DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _, _, _),
+ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _,
_),
rewritePlan: ReplaceIcebergData)
if conf.dynamicPartitionPruningEnabled && isCandidate(command) =>
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index dbf5d455b9..9674a7333f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -39,11 +39,11 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
@@ -52,7 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
- implements SupportsRuntimeFiltering {
+ implements SupportsRuntimeV2Filtering {
private static final Logger LOG =
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
@@ -118,7 +118,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
}
@Override
- public void filter(Filter[] filters) {
+ public void filter(Predicate[] predicates) {
Preconditions.checkState(
Objects.equals(snapshotId(), currentSnapshotId()),
"Runtime file filtering is not possible: the table has been
concurrently modified. "
@@ -128,16 +128,10 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
snapshotId(),
currentSnapshotId());
- for (Filter filter : filters) {
- // Spark can only pass In filters at the moment
- if (filter instanceof In
- && ((In)
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
- In in = (In) filter;
-
- Set<String> fileLocations = Sets.newHashSet();
- for (Object value : in.values()) {
- fileLocations.add((String) value);
- }
+ for (Predicate predicate : predicates) {
+ // Spark can only pass IN predicates at the moment
+ if (isFilePathInPredicate(predicate)) {
+ Set<String> fileLocations = extractStringLiterals(predicate);
// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both
sides
@@ -159,7 +153,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
resetTasks(filteredTasks);
}
} else {
- LOG.warn("Unsupported runtime filter {}", filter);
+ LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
@@ -228,4 +222,32 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
||
field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
return hasLineageFieldName && field.metadata().contains("__metadata_col");
}
+
+ private static boolean isFilePathInPredicate(Predicate predicate) {
+ if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+ return false;
+ }
+
+ if (!(predicate.children()[0] instanceof NamedReference)) {
+ return false;
+ }
+
+ String[] fieldNames = ((NamedReference)
predicate.children()[0]).fieldNames();
+
+ return fieldNames.length == 1
+ && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+ }
+
+ private static Set<String> extractStringLiterals(Predicate predicate) {
+ Set<String> values = Sets.newHashSet();
+ for (int i = 1; i < predicate.children().length; i++) {
+ if (predicate.children()[i] instanceof Literal) {
+ Object value = ((Literal<?>) predicate.children()[i]).value();
+ // V2 string literals come through as UTF8String; toString()
materializes the Java String
+ values.add(value.toString());
+ }
+ }
+
+ return values;
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index dbf5d455b9..9674a7333f 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -39,11 +39,11 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
@@ -52,7 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
- implements SupportsRuntimeFiltering {
+ implements SupportsRuntimeV2Filtering {
private static final Logger LOG =
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
@@ -118,7 +118,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
}
@Override
- public void filter(Filter[] filters) {
+ public void filter(Predicate[] predicates) {
Preconditions.checkState(
Objects.equals(snapshotId(), currentSnapshotId()),
"Runtime file filtering is not possible: the table has been
concurrently modified. "
@@ -128,16 +128,10 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
snapshotId(),
currentSnapshotId());
- for (Filter filter : filters) {
- // Spark can only pass In filters at the moment
- if (filter instanceof In
- && ((In)
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
- In in = (In) filter;
-
- Set<String> fileLocations = Sets.newHashSet();
- for (Object value : in.values()) {
- fileLocations.add((String) value);
- }
+ for (Predicate predicate : predicates) {
+ // Spark can only pass IN predicates at the moment
+ if (isFilePathInPredicate(predicate)) {
+ Set<String> fileLocations = extractStringLiterals(predicate);
// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both
sides
@@ -159,7 +153,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
resetTasks(filteredTasks);
}
} else {
- LOG.warn("Unsupported runtime filter {}", filter);
+ LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
@@ -228,4 +222,32 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
||
field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
return hasLineageFieldName && field.metadata().contains("__metadata_col");
}
+
+ private static boolean isFilePathInPredicate(Predicate predicate) {
+ if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+ return false;
+ }
+
+ if (!(predicate.children()[0] instanceof NamedReference)) {
+ return false;
+ }
+
+ String[] fieldNames = ((NamedReference)
predicate.children()[0]).fieldNames();
+
+ return fieldNames.length == 1
+ && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+ }
+
+ private static Set<String> extractStringLiterals(Predicate predicate) {
+ Set<String> values = Sets.newHashSet();
+ for (int i = 1; i < predicate.children().length; i++) {
+ if (predicate.children()[i] instanceof Literal) {
+ Object value = ((Literal<?>) predicate.children()[i]).value();
+ // V2 string literals come through as UTF8String; toString()
materializes the Java String
+ values.add(value.toString());
+ }
+ }
+
+ return values;
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index ee4be24618..f957b97d60 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -38,16 +38,16 @@ import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
- implements SupportsRuntimeFiltering {
+ implements SupportsRuntimeV2Filtering {
private static final Logger LOG =
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
@@ -103,7 +103,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
}
@Override
- public void filter(Filter[] filters) {
+ public void filter(Predicate[] predicates) {
Preconditions.checkState(
Objects.equals(snapshotId(), currentSnapshotId()),
"Runtime file filtering is not possible: the table has been
concurrently modified. "
@@ -113,16 +113,10 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
snapshotId(),
currentSnapshotId());
- for (Filter filter : filters) {
- // Spark can only pass In filters at the moment
- if (filter instanceof In
- && ((In)
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
- In in = (In) filter;
-
- Set<String> fileLocations = Sets.newHashSet();
- for (Object value : in.values()) {
- fileLocations.add((String) value);
- }
+ for (Predicate predicate : predicates) {
+ // Spark can only pass IN predicates at the moment
+ if (isFilePathInPredicate(predicate)) {
+ Set<String> fileLocations = extractStringLiterals(predicate);
// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both
sides
@@ -144,7 +138,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
resetTasks(filteredTasks);
}
} else {
- LOG.warn("Unsupported runtime filter {}", filter);
+ LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
@@ -188,4 +182,32 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table(), branch());
return currentSnapshot != null ? currentSnapshot.snapshotId() : null;
}
+
+ private static boolean isFilePathInPredicate(Predicate predicate) {
+ if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+ return false;
+ }
+
+ if (!(predicate.children()[0] instanceof NamedReference)) {
+ return false;
+ }
+
+ String[] fieldNames = ((NamedReference)
predicate.children()[0]).fieldNames();
+
+ return fieldNames.length == 1
+ && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+ }
+
+ private static Set<String> extractStringLiterals(Predicate predicate) {
+ Set<String> values = Sets.newHashSet();
+ for (int i = 1; i < predicate.children().length; i++) {
+ if (predicate.children()[i] instanceof Literal) {
+ Object value = ((Literal<?>) predicate.children()[i]).value();
+ // V2 string literals come through as UTF8String; toString()
materializes the Java String
+ values.add(value.toString());
+ }
+ }
+
+ return values;
+ }
}