This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e1182e2c2a Spark 4.1: Migrate SparkCopyOnWriteScan to 
SupportsRuntimeV2Filtering (#16295)
e1182e2c2a is described below

commit e1182e2c2a6f4282167f9c4a71889f107b99a011
Author: drexler-sky <[email protected]>
AuthorDate: Tue May 12 09:32:39 2026 -0700

    Spark 4.1: Migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering 
(#16295)
---
 .../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
 1 file changed, 38 insertions(+), 16 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index 1892c5812a..b8b3a0edea 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -35,16 +35,16 @@ import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.Literal;
 import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
-    implements SupportsRuntimeFiltering {
+    implements SupportsRuntimeV2Filtering {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
 
@@ -92,17 +92,11 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
   }
 
   @Override
-  public void filter(Filter[] filters) {
-    for (Filter filter : filters) {
-      // Spark can only pass In filters at the moment
-      if (filter instanceof In
-          && ((In) 
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
-        In in = (In) filter;
-
-        Set<String> fileLocations = Sets.newHashSet();
-        for (Object value : in.values()) {
-          fileLocations.add((String) value);
-        }
+  public void filter(Predicate[] predicates) {
+    for (Predicate predicate : predicates) {
+      // Spark can only pass IN predicates at the moment
+      if (isFilePathInPredicate(predicate)) {
+        Set<String> fileLocations = extractStringLiterals(predicate);
 
         // Spark may call this multiple times for UPDATEs with subqueries
         // as such cases are rewritten using UNION and the same scan on both 
sides
@@ -124,7 +118,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
           resetTasks(filteredTasks);
         }
       } else {
-        LOG.warn("Unsupported runtime filter {}", filter);
+        LOG.warn("Unsupported runtime filter {}", predicate);
       }
     }
   }
@@ -167,4 +161,32 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
         "IcebergCopyOnWriteScan(table=%s, schemaId=%s, snapshotId=%s, 
branch=%s, filters=%s, groupedBy=%s)",
         table(), schema().schemaId(), snapshotId(), branch, filtersDesc(), 
groupingKeyDesc());
   }
+
+  private static boolean isFilePathInPredicate(Predicate predicate) {
+    if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+      return false;
+    }
+
+    if (!(predicate.children()[0] instanceof NamedReference)) {
+      return false;
+    }
+
+    String[] fieldNames = ((NamedReference) 
predicate.children()[0]).fieldNames();
+
+    return fieldNames.length == 1
+        && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+  }
+
+  private static Set<String> extractStringLiterals(Predicate predicate) {
+    Set<String> values = Sets.newHashSet();
+    for (int i = 1; i < predicate.children().length; i++) {
+      if (predicate.children()[i] instanceof Literal) {
+        Object value = ((Literal<?>) predicate.children()[i]).value();
+        // V2 string literals come through as UTF8String; toString() 
materializes the Java String
+        values.add(value.toString());
+      }
+    }
+
+    return values;
+  }
 }

Reply via email to