This is an automated email from the ASF dual-hosted git repository.
mahongbin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new fa0079dca [GLUTEN-5060][CH] Remove unnecessary FilterExec execution
when querying from MergeTree with the prewhere (#5067)
fa0079dca is described below
commit fa0079dca26e755ad4760839d4230885baf3ce5a
Author: Zhichao Zhang <[email protected]>
AuthorDate: Thu Mar 21 16:32:41 2024 +0800
[GLUTEN-5060][CH] Remove unnecessary FilterExec execution when querying
from MergeTree with the prewhere (#5067)
When querying from MergeTree with the prewhere, all the filters will be
pushdowned to the ScanExec, so it does not need to execute the FilterExec again.
Close #5060.
---
.../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 19 +++++++++++++------
.../extension/FallbackBroadcaseHashJoinRules.scala | 3 ++-
.../GlutenClickHouseMergeTreeWriteSuite.scala | 16 +++++++++++++---
3 files changed, 28 insertions(+), 10 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index cea88266e..29af5a0e5 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -22,8 +22,7 @@ import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.execution._
import io.glutenproject.expression._
import io.glutenproject.expression.ConverterUtils.FunctionConfig
-import io.glutenproject.extension.{FallbackBroadcastHashJoin,
FallbackBroadcastHashJoinPrepQueryStage}
-import io.glutenproject.extension.CountDistinctWithoutExpand
+import io.glutenproject.extension.{CountDistinctWithoutExpand,
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage}
import io.glutenproject.extension.columnar.AddTransformHintRule
import
io.glutenproject.extension.columnar.MiscColumnarRules.TransformPreOverrides
import io.glutenproject.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
@@ -48,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
-import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, WriteFilesExec}
import
org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
@@ -121,10 +120,18 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genFilterExecTransformer(
condition: Expression,
child: SparkPlan): FilterExecTransformerBase = {
+
+ def checkMergeTreeFileFormat(relation: HadoopFsRelation): Boolean = {
+ relation.location.isInstanceOf[TahoeFileIndex] &&
+ relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]
+ }
+
child match {
- case scan: FileSourceScanExec
- if (scan.relation.location.isInstanceOf[TahoeFileIndex] &&
- scan.relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]) =>
+ case scan: FileSourceScanExec if
(checkMergeTreeFileFormat(scan.relation)) =>
+ // For the validation phase of the AddTransformHintRule
+ CHFilterExecTransformer(condition, child)
+ case scan: FileSourceScanExecTransformerBase if
(checkMergeTreeFileFormat(scan.relation)) =>
+ // For the transform phase, the FileSourceScanExec is already
transformed
CHFilterExecTransformer(condition, child)
case _ =>
FilterExecTransformer(condition, child)
diff --git
a/backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
similarity index 98%
rename from
backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
rename to
backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
index ccfa16501..52ec8ce47 100644
---
a/backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
@@ -100,7 +100,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session:
SparkSession) extend
case class FallbackBroadcastHashJoin(session: SparkSession) extends
Rule[SparkPlan] {
private val enableColumnarBroadcastJoin: Boolean =
- GlutenConfig.getConf.enableColumnarBroadcastJoin &&
GlutenConfig.getConf.enableColumnarBroadcastExchange
+ GlutenConfig.getConf.enableColumnarBroadcastJoin &&
+ GlutenConfig.getConf.enableColumnarBroadcastExchange
override def apply(plan: SparkPlan): SparkPlan = {
plan.foreachUp {
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index ca192cb89..457c88e34 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -129,12 +129,13 @@ class GlutenClickHouseMergeTreeWriteSuite
|""".stripMargin
runTPCHQueryBySQL(1, sqlStr) {
df =>
- val scanExec = collect(df.queryExecution.executedPlan) {
+ val plans = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
+ case w: WholeStageTransformer => w
}
- assert(scanExec.size == 1)
+ assert(plans.size == 4)
- val mergetreeScan = scanExec(0)
+ val mergetreeScan =
plans(3).asInstanceOf[FileSourceScanExecTransformer]
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
@@ -148,6 +149,15 @@ class GlutenClickHouseMergeTreeWriteSuite
assert(
addFiles.map(_.rows).sum
== 600572)
+
+ // GLUTEN-5060: check the unnecessary FilterExec
+ val wholeStageTransformer =
plans(2).asInstanceOf[WholeStageTransformer]
+ val planNodeJson = wholeStageTransformer.substraitPlanJson
+ assert(
+ !planNodeJson
+ .replaceAll("\\\n", "")
+ .replaceAll(" ", "")
+ .contains("\"input\":{\"filter\":{"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]