This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 469ee6ebf perf: Add `COMET_RESPECT_PARQUET_FILTER_PUSHDOWN` config (#1936) 469ee6ebf is described below commit 469ee6ebf171c4304f673b77ce9f2bbf55d0f8f3 Author: Andy Grove <agr...@apache.org> AuthorDate: Fri Jun 27 14:59:06 2025 -0600 perf: Add `COMET_RESPECT_PARQUET_FILTER_PUSHDOWN` config (#1936) --- common/src/main/scala/org/apache/comet/CometConf.scala | 10 ++++++++++ dev/diffs/3.4.3.diff | 5 +++-- dev/diffs/3.5.6.diff | 5 +++-- dev/diffs/4.0.0-preview1.diff | 5 +++-- docs/source/user-guide/configs.md | 1 + .../org/apache/comet/parquet/CometParquetFileFormat.scala | 3 ++- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 15 ++++++++++++--- .../scala/org/apache/comet/parquet/ParquetReadSuite.scala | 6 +----- .../test/scala/org/apache/spark/sql/CometTestBase.scala | 1 + 9 files changed, 36 insertions(+), 15 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 555beb5cb..ee18fb950 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -106,6 +106,16 @@ object CometConf extends ShimCometConf { .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET) .toLowerCase(Locale.ROOT)) + val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] = + conf("spark.comet.parquet.respectFilterPushdown") + .doc( + "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " + + "respected when running the Spark SQL test suite but the default setting " + + "results in poor performance in Comet when using the new native scans, " + + "disabled by default") + .booleanConf + .createWithDefault(false) + val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.parquet.read.parallel.io.enabled") .doc( diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index c4d318e0b..e08aea606 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2798,10 +2798,10 @@ index dd55fcfe42c..a1d390c93d0 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -2810,6 +2810,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index 98de916d0..883a16715 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -2770,10 +2770,10 @@ index e937173a590..ca06132102d 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -2782,6 +2782,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 8686b4456..e57a245f0 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -3009,10 +3009,10 @@ index 5fbf379644f..d0575e1df69 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -3021,6 +3021,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96bd7cec1..6544909aa 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ Comet provides the following configuration settings. | spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | +| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true | diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index 871ac2704..5c2de2a6b 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -99,7 +99,8 @@ class CometParquetFileFormat(scanImpl: String) val optionsMap = CaseInsensitiveMap[String](options) val parquetOptions = new ParquetOptions(optionsMap, sqlConf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - val parquetFilterPushDown = sqlConf.parquetFilterPushDown + val parquetFilterPushDown = sqlConf.parquetFilterPushDown && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf) // Comet specific configurations val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1e3cec285..e445cbb7c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -22,6 +22,7 @@ package org.apache.comet.serde import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.math.min import org.apache.spark.internal.Logging @@ -2192,9 +2193,17 @@ object QueryPlanSerde extends Logging with CometExprShim { // Sink operators don't have children result.clearChildren() - if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) { - // TODO remove flatMap and add error handling for unsupported data filters - val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output)) + if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) { + + val dataFilters = new ListBuffer[Expr]() + for (filter <- scan.dataFilters) { + exprToProto(filter, scan.output) match { + case Some(proto) => dataFilters += proto + case _ => + logWarning(s"Unsupported data filter $filter") + } + } nativeScanBuilder.addAllDataFilters(dataFilters.asJava) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 4801678a4..581c60611 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -26,7 +26,7 @@ import java.time.{ZoneId, ZoneOffset} import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import scala.util.control.Breaks.{break, breakable} +import scala.util.control.Breaks.breakable import org.scalactic.source.Position import org.scalatest.Tag @@ -1902,10 +1902,6 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode, SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown.toString) { - if (scanMode == CometConf.SCAN_NATIVE_DATAFUSION && !pushDown) { - // FIXME: native_datafusion always pushdown data filters - break() - } Seq( ("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean ("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 05c46e307..a2663cf0b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -78,6 +78,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") // set the scan impl to SCAN_NATIVE_COMET because many tests are implemented --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org