This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 469ee6ebf perf: Add `COMET_RESPECT_PARQUET_FILTER_PUSHDOWN` config 
(#1936)
469ee6ebf is described below

commit 469ee6ebf171c4304f673b77ce9f2bbf55d0f8f3
Author: Andy Grove <agr...@apache.org>
AuthorDate: Fri Jun 27 14:59:06 2025 -0600

    perf: Add `COMET_RESPECT_PARQUET_FILTER_PUSHDOWN` config (#1936)
---
 common/src/main/scala/org/apache/comet/CometConf.scala    | 10 ++++++++++
 dev/diffs/3.4.3.diff                                      |  5 +++--
 dev/diffs/3.5.6.diff                                      |  5 +++--
 dev/diffs/4.0.0-preview1.diff                             |  5 +++--
 docs/source/user-guide/configs.md                         |  1 +
 .../org/apache/comet/parquet/CometParquetFileFormat.scala |  3 ++-
 .../scala/org/apache/comet/serde/QueryPlanSerde.scala     | 15 ++++++++++++---
 .../scala/org/apache/comet/parquet/ParquetReadSuite.scala |  6 +-----
 .../test/scala/org/apache/spark/sql/CometTestBase.scala   |  1 +
 9 files changed, 36 insertions(+), 15 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 555beb5cb..ee18fb950 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -106,6 +106,16 @@ object CometConf extends ShimCometConf {
       .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
       .toLowerCase(Locale.ROOT))
 
+  val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
+    conf("spark.comet.parquet.respectFilterPushdown")
+      .doc(
+        "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. 
This needs to be " +
+          "respected when running the Spark SQL test suite but the default 
setting " +
+          "results in poor performance in Comet when using the new native 
scans, " +
+          "disabled by default")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.parquet.read.parallel.io.enabled")
       .doc(
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index c4d318e0b..e08aea606 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -2798,10 +2798,10 @@ index dd55fcfe42c..a1d390c93d0 100644
  
      spark.internalCreateDataFrame(withoutFilters.execute(), schema)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-index ed2e309fa07..71ba6533c9d 100644
+index ed2e309fa07..a1fb4abe681 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
+@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
        // this rule may potentially block testing of other optimization rules 
such as
        // ConstantPropagation etc.
        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
@@ -2810,6 +2810,7 @@ index ed2e309fa07..71ba6533c9d 100644
 +      conf
 +        .set("spark.sql.extensions", 
"org.apache.comet.CometSparkSessionExtensions")
 +        .set("spark.comet.enabled", "true")
++        .set("spark.comet.parquet.respectFilterPushdown", "true")
 +
 +      if (!isCometScanOnly) {
 +        conf
diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff
index 98de916d0..883a16715 100644
--- a/dev/diffs/3.5.6.diff
+++ b/dev/diffs/3.5.6.diff
@@ -2770,10 +2770,10 @@ index e937173a590..ca06132102d 100644
  
      spark.internalCreateDataFrame(withoutFilters.execute(), schema)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-index ed2e309fa07..71ba6533c9d 100644
+index ed2e309fa07..a1fb4abe681 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
+@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
        // this rule may potentially block testing of other optimization rules 
such as
        // ConstantPropagation etc.
        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
@@ -2782,6 +2782,7 @@ index ed2e309fa07..71ba6533c9d 100644
 +      conf
 +        .set("spark.sql.extensions", 
"org.apache.comet.CometSparkSessionExtensions")
 +        .set("spark.comet.enabled", "true")
++        .set("spark.comet.parquet.respectFilterPushdown", "true")
 +
 +      if (!isCometScanOnly) {
 +        conf
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index 8686b4456..e57a245f0 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -3009,10 +3009,10 @@ index 5fbf379644f..d0575e1df69 100644
  
      spark.internalCreateDataFrame(withoutFilters.execute(), schema)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-index ed2e309fa07..71ba6533c9d 100644
+index ed2e309fa07..a1fb4abe681 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
+@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
        // this rule may potentially block testing of other optimization rules 
such as
        // ConstantPropagation etc.
        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
@@ -3021,6 +3021,7 @@ index ed2e309fa07..71ba6533c9d 100644
 +      conf
 +        .set("spark.sql.extensions", 
"org.apache.comet.CometSparkSessionExtensions")
 +        .set("spark.comet.enabled", "true")
++        .set("spark.comet.parquet.respectFilterPushdown", "true")
 +
 +      if (!isCometScanOnly) {
 +        conf
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 96bd7cec1..6544909aa 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -84,6 +84,7 @@ Comet provides the following configuration settings.
 | spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between 
consecutive read ranges below which the parallel reader will try to merge the 
ranges. The default is 8MB. | 8388608 |
 | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's 
parallel reader for Parquet files. The parallel reader reads ranges of 
consecutive data in a  file in parallel. It is faster for large files and row 
groups but uses more resources. | true |
 | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number 
of parallel threads the parallel reader will use in a single executor. For 
executors configured with a smaller number of cores, use a smaller number. | 16 
|
+| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's 
PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running 
the Spark SQL test suite but the default setting results in poor performance in 
Comet when using the new native scans, disabled by default | false |
 | spark.comet.regexp.allowIncompatible | Comet is not currently fully 
compatible with Spark for all regular expressions. Set this config to true to 
allow them anyway. For more information, refer to the Comet Compatibility Guide 
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
 | spark.comet.scan.allowIncompatible | Some Comet scan implementations are not 
currently fully compatible with Spark for all datatypes. Set this config to 
true to allow them anyway. For more information, refer to the Comet 
Compatibility Guide 
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
 | spark.comet.scan.enabled | Whether to enable native scans. When this is 
turned on, Spark will use Comet to read supported data sources (currently only 
Parquet is supported natively). Note that to enable native vectorized 
execution, both this config and 'spark.comet.exec.enabled' need to be enabled. 
| true |
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 871ac2704..5c2de2a6b 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -99,7 +99,8 @@ class CometParquetFileFormat(scanImpl: String)
     val optionsMap = CaseInsensitiveMap[String](options)
     val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
     val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
-    val parquetFilterPushDown = sqlConf.parquetFilterPushDown
+    val parquetFilterPushDown = sqlConf.parquetFilterPushDown &&
+      CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf)
 
     // Comet specific configurations
     val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 1e3cec285..e445cbb7c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -22,6 +22,7 @@ package org.apache.comet.serde
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.math.min
 
 import org.apache.spark.internal.Logging
@@ -2192,9 +2193,17 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
           // Sink operators don't have children
           result.clearChildren()
 
-          if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) {
-            // TODO remove flatMap and add error handling for unsupported data 
filters
-            val dataFilters = scan.dataFilters.flatMap(exprToProto(_, 
scan.output))
+          if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) &&
+            CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) {
+
+            val dataFilters = new ListBuffer[Expr]()
+            for (filter <- scan.dataFilters) {
+              exprToProto(filter, scan.output) match {
+                case Some(proto) => dataFilters += proto
+                case _ =>
+                  logWarning(s"Unsupported data filter $filter")
+              }
+            }
             nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
           }
 
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 4801678a4..581c60611 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -26,7 +26,7 @@ import java.time.{ZoneId, ZoneOffset}
 import scala.collection.mutable.ListBuffer
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
-import scala.util.control.Breaks.{break, breakable}
+import scala.util.control.Breaks.breakable
 
 import org.scalactic.source.Position
 import org.scalatest.Tag
@@ -1902,10 +1902,6 @@ class ParquetReadV1Suite extends ParquetReadSuite with 
AdaptiveSparkPlanHelper {
               withSQLConf(
                 CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode,
                 SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
pushDown.toString) {
-                if (scanMode == CometConf.SCAN_NATIVE_DATAFUSION && !pushDown) 
{
-                  // FIXME: native_datafusion always pushdown data filters
-                  break()
-                }
                 Seq(
                   ("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean
                   ("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 05c46e307..a2663cf0b 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -78,6 +78,7 @@ abstract class CometTestBase
     conf.set(CometConf.COMET_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
+    conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
     conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
     conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
     // set the scan impl to SCAN_NATIVE_COMET because many tests are 
implemented


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to