This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 686a6eb05 [spark] Enhance spark push down filter (#2376)
686a6eb05 is described below

commit 686a6eb05cb5295c6282284a5d16a9e18f98640f
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Nov 28 12:27:41 2023 +0800

    [spark] Enhance spark push down filter (#2376)
---
 .../predicate/PartitionPredicateVisitor.java       | 48 +++++++++++
 .../paimon/flink/source/FlinkTableSource.java      | 23 +----
 .../paimon/flink/source/FlinkTableSourceTest.java  |  8 +-
 .../org/apache/paimon/spark/SparkScanBuilder.java  | 35 ++++++--
 .../apache/paimon/spark/SparkPushDownTest.scala    | 97 ++++++++++++++++++++++
 5 files changed, 178 insertions(+), 33 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
new file mode 100644
index 000000000..b859c7a56
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import java.util.List;
+
+/** Visit the predicate and check if it only contains partition key's 
predicate. */
+public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> {
+
+    private final List<String> partitionKeys;
+
+    public PartitionPredicateVisitor(List<String> partitionKeys) {
+        this.partitionKeys = partitionKeys;
+    }
+
+    @Override
+    public Boolean visit(LeafPredicate predicate) {
+        return partitionKeys.contains(predicate.fieldName());
+    }
+
+    @Override
+    public Boolean visit(CompoundPredicate predicate) {
+        for (Predicate child : predicate.children()) {
+            Boolean matched = child.visit(this);
+
+            if (!matched) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index c35b46444..68209240e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.PartitionPredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateVisitor;
@@ -83,25 +82,7 @@ public abstract class FlinkTableSource {
         List<ResolvedExpression> unConsumedFilters = new ArrayList<>();
         List<ResolvedExpression> consumedFilters = new ArrayList<>();
         List<Predicate> converted = new ArrayList<>();
-        PredicateVisitor<Boolean> visitor =
-                new PredicateVisitor<Boolean>() {
-                    @Override
-                    public Boolean visit(LeafPredicate predicate) {
-                        return partitionKeys.contains(predicate.fieldName());
-                    }
-
-                    @Override
-                    public Boolean visit(CompoundPredicate predicate) {
-                        for (Predicate child : predicate.children()) {
-                            Boolean matched = child.visit(this);
-
-                            if (!matched) {
-                                return false;
-                            }
-                        }
-                        return true;
-                    }
-                };
+        PredicateVisitor<Boolean> visitor = new 
PartitionPredicateVisitor(partitionKeys);
 
         for (ResolvedExpression filter : filters) {
             Optional<Predicate> predicateOptional = 
PredicateConverter.convert(rowType, filter);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
index 350fe9e3a..ef48099c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
@@ -86,11 +86,11 @@ public class FlinkTableSourceTest extends TableTestBase {
         Assertions.assertThat(tableSource.pushFilters(filters))
                 .isEqualTo(ImmutableList.of(filters.get(0)));
 
-        // col1 = 1 && p2 like '%a' => [p2 like '%a']
-        filters = ImmutableList.of(p2Like("%a"));
+        // col1 = 1 && p2 like '%a' => None
+        filters = ImmutableList.of(col1Equal1(), p2Like("%a"));
         
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
 
-        // col1 = 1 && p2 like 'a%' => None
+        // col1 = 1 && p2 like 'a%' => [p2 like 'a%']
         filters = ImmutableList.of(col1Equal1(), p2Like("a%"));
         Assertions.assertThat(tableSource.pushFilters(filters))
                 .isEqualTo(ImmutableList.of(filters.get(0)));
@@ -99,7 +99,7 @@ public class FlinkTableSourceTest extends TableTestBase {
         filters = ImmutableList.of(rand());
         
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
 
-        // upper(p1) = "A"
+        // upper(p1) = "A" => [upper(p1) = "A"]
         filters = ImmutableList.of(upperP2EqualA());
         
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
index ec7367ed2..06652ff9f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.predicate.PartitionPredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
 
@@ -27,6 +28,8 @@ import 
org.apache.spark.sql.connector.read.SupportsPushDownFilters;
 import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -34,7 +37,7 @@ import java.util.List;
 /** A Spark {@link ScanBuilder} for paimon. */
 public class SparkScanBuilder
         implements ScanBuilder, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkScanBuilder.class);
     private final Table table;
 
     private List<Predicate> predicates = new ArrayList<>();
@@ -47,19 +50,35 @@ public class SparkScanBuilder
 
     @Override
     public Filter[] pushFilters(Filter[] filters) {
+        // There are 3 kinds of filters:
+        // (1) pushable filters which don't need to be evaluated again after 
scanning, e.g. filter
+        // partitions.
+        // (2) pushable filters which still need to be evaluated after 
scanning.
+        // (3) non-pushable filters.
+        // case 1 and 2 are considered as pushable filters and will be 
returned by pushedFilters().
+        // case 2 and 3 are considered as postScan filters and should be 
return by this method.
+        List<Filter> pushable = new ArrayList<>(filters.length);
+        List<Filter> postScan = new ArrayList<>(filters.length);
+        List<Predicate> predicates = new ArrayList<>(filters.length);
+
         SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
-        List<Predicate> predicates = new ArrayList<>();
-        List<Filter> pushed = new ArrayList<>();
+        PartitionPredicateVisitor visitor = new 
PartitionPredicateVisitor(table.partitionKeys());
         for (Filter filter : filters) {
             try {
-                predicates.add(converter.convert(filter));
-                pushed.add(filter);
-            } catch (UnsupportedOperationException ignore) {
+                Predicate predicate = converter.convert(filter);
+                predicates.add(predicate);
+                pushable.add(filter);
+                if (!predicate.visit(visitor)) {
+                    postScan.add(filter);
+                }
+            } catch (UnsupportedOperationException e) {
+                LOG.warn(e.getMessage());
+                postScan.add(filter);
             }
         }
         this.predicates = predicates;
-        this.pushedFilters = pushed.toArray(new Filter[0]);
-        return filters;
+        this.pushedFilters = pushable.toArray(new Filter[0]);
+        return postScan.toArray(new Filter[0]);
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
new file mode 100644
index 000000000..6d1360b95
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.junit.jupiter.api.Assertions
+
+class SparkPushDownTest extends PaimonSparkTestBase {
+
+  test(s"Paimon push down: apply partition filter push down with 
non-partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 
'p2')")
+
+    val q = "SELECT * FROM T WHERE pt = 'p1'"
+    Assertions.assertTrue(checkEqualToFilterExists(q, "pt", Literal("p1")))
+    checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil)
+  }
+
+  test(s"Paimon push down: apply partition filter push down with partitioned 
table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 
'p2'), (4, 'd', 'p3')")
+
+    // partition filter push down did not hit cases:
+    // case 1
+    var q = "SELECT * FROM T WHERE id = '1'"
+    Assertions.assertTrue(checkFilterExists(q))
+    checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Nil)
+
+    // case 2
+    // filter "id = '1' or pt = 'p1'" can't push down completely, it still 
need to be evaluated after scanning
+    q = "SELECT * FROM T WHERE id = '1' or pt = 'p1'"
+    Assertions.assertTrue(checkEqualToFilterExists(q, "pt", Literal("p1")))
+    checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil)
+
+    // partition filter push down hit cases:
+    // case 1
+    q = "SELECT * FROM T WHERE pt = 'p1'"
+    Assertions.assertFalse(checkFilterExists(q))
+    checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil)
+
+    // case 2
+    q = "SELECT * FROM T WHERE id = '1' and pt = 'p1'"
+    Assertions.assertFalse(checkEqualToFilterExists(q, "pt", Literal("p1")))
+    checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Nil)
+
+    // case 3
+    q = "SELECT * FROM T WHERE pt < 'p3'"
+    Assertions.assertFalse(checkFilterExists(q))
+    checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, 
"c", "p2") :: Nil)
+  }
+
+  private def checkFilterExists(sql: String): Boolean = {
+    spark.sql(sql).queryExecution.optimizedPlan.exists {
+      case Filter(_: Expression, _) => true
+      case _ => false
+    }
+  }
+
+  private def checkEqualToFilterExists(sql: String, name: String, value: 
Literal): Boolean = {
+    spark.sql(sql).queryExecution.optimizedPlan.exists {
+      case Filter(c: Expression, _) =>
+        c.exists {
+          case EqualTo(a: AttributeReference, r: Literal) =>
+            a.name.equals(name) && r.equals(value)
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+
+}

Reply via email to