rishvin opened a new issue, #1831:
URL: https://github.com/apache/datafusion-comet/issues/1831
### Describe the bug
The following test cases failed when enabling them (relates to #1739),
- Runtime bloom filter join: do not add bloom filter if dpp filter exists on
the same column
- Runtime bloom filter join: add bloom filter if dpp filter exists on a
different column
These two tests failed with the following stacktrace,
```
Custom columnar rules cannot transform shuffle node to something else.
java.lang.IllegalStateException: Custom columnar rules cannot transform
shuffle node to something else.
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:554)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:506)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:536)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:536)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:247)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3459)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4206)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.$anonfun$checkWithAndWithoutFeatureEnabled$1(InjectRuntimeFilterSuite.scala:246)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(InjectRuntimeFilterSuite.scala:31)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:275)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:273)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.withSQLConf(InjectRuntimeFilterSuite.scala:31)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.checkWithAndWithoutFeatureEnabled(InjectRuntimeFilterSuite.scala:244)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.assertDidNotRewriteWithBloomFilter(InjectRuntimeFilterSuite.scala:335)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.$anonfun$new$29(InjectRuntimeFilterSuite.scala:474)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
at
org.apache.spark.sql.InjectRuntimeFilterSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(InjectRuntimeFilterSuite.scala:31)
```
---
**Reason**
This test will fail when AQE is enabled. The physical plan before AQE is
like so,
```
SortMergeJoin [c5#XXX, f5#XXX], [c2#XXX, f2#XXX], Inner
:- Sort [c5#XXX ASC NULLS FIRST, f5#XXX ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c5#XXX, f5#XXX, 5), ENSURE_REQUIREMENTS,
[plan_id=XXX]
: +- Filter isnotnull(c5#XXX)
: +- FileScan parquet
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX]
Batched: true, DataFilters: [isnotnull(c5#XXX)], Format: Parquet, Location:
InMemoryFileIndex(14 paths)[file:///.../table1/...,
PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN
dynamicpruning#XXX)], PushedFilters: [IsNotNull(c5)], ReadSchema:
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
: +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 1, true,
Filter ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND (isnotnull(c2#XXX) AND
isnotnull(f2#XXX))), [c2#XXX, f2#XXX]
: +- AdaptiveSparkPlan isFinalPlan=false
: +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX,
e2#XXX, f2#XXX], (((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX))
AND isnotnull(f2#XXX))
: +- CometScan parquet
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX]
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62),
isnotnull(c2#XXX), isnotnull(f2#XXX)], Format:
CometParquet, Location: InMemoryFileIndex(1 paths)[file:///.../table2/...,
PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62),
IsNotNull(c2), IsNotNull(f2)], ReadSchema:
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
+- CometSort [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], [c2#XXX ASC
NULLS FIRST, f2#XXX ASC NULLS FIRST]
+- CometExchange hashpartitioning(c2#XXX, f2#XXX, 5),
ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=XXX]
+- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX],
(((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) AND
isnotnull(f2#XXX))
+- CometScan parquet
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX]
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62),
isnotnull(c2#XXX), isnotnull(f2#XXX)], Format: CometParquet,
Location: InMemoryFileIndex(1 paths)[file:///.../table2/...,
PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62),
IsNotNull(c2), IsNotNull(f2)], ReadSchema:
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
```
The stacktrace is thrown while AQE is creating query-stages inside the
method
[newQueryStage](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L552).
The method newQueryStage is called after all child stages of the current
physical nodes have been materialized. At this point the AQE has the
opportunity to apply rewrite rules to optimize the physical plan.
Since the following sub-plans doesn't have any child stages, they are
already-materialized,
```
CreateStageResult(FileScan parquet
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX]
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(14
paths)[file:///.../table1/...,
PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN
dynamicpruning#XXX)], PushedFilters: [], ReadSchema:
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
+- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 0, true, Filter
((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)), [c2#XXX]
+- AdaptiveSparkPlan isFinalPlan=false
+- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX],
((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX))
+- CometScan parquet
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX]
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62),
isnotnull(c2#XXX)], Format: CometParquet, Location:
InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [],
PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2)], ReadSchema:
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
,true,List())
```
Then the `newQueryStage` is called with above sub-plan like so,
```
Exchange hashpartitioning(f5#XXX, 5), ENSURE_REQUIREMENTS, [plan_id=XXX]
+- FileScan parquet
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX]
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(14
paths)[file:///.../table1/..., PartitionFilters:
[isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN
dynamicpruning#XXX)], PushedFilters: [], ReadSchema:
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
+- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 0, true, Filter
((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)), [c2#XXX]
+- AdaptiveSparkPlan isFinalPlan=false
+- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX,
f2#XXX], ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX))
+- CometScan parquet
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX]
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62),
isnotnull(c2#XXX)], Format: CometParquet, Location:
InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [],
PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2)], ReadSchema:
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
```
Somewhere down the stack in this method, the columnar rules
[ApplyColumnarRulesAndInsertTransitions](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L566)
will be applied.
Before applying the post-columnar rules, the plan will be wrapped within
[ColumnarToRowExec](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L545).
Later the plan is passed to the Comet rewriter, where
`EliminateRedundantTransitions` is applied.
This rule has the following case,
```
case c @ ColumnarToRowExec(child) if hasCometNativeChild(child) =>
val op = CometColumnarToRowExec(child)
if (c.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
c.logicalLink.foreach(op.setLogicalLink)
}
op
```
We can see that it's wrapping the `ColumnarToRowExec` with
`ComentColumnarToRowExec`.
All this happened within the `newQueryStage` method. The `newQueryStage`
receives the rewritten plan with top-level physical node being
`ComentColumnarToRowExec`. Since this node is not an `Exchange` physical node,
the `newQueryStage` method throws.
In this test case, the top-level node was expected to be of type
`ShuffleExchangeLike` but was actually `ComentColumnarToRowExec`.
Here, was the rewritten plan that threw,
```
CometColumnarToRow
+- CometExchange hashpartitioning(c5#XXX, f5#XXX, 5), ENSURE_REQUIREMENTS,
CometNativeShuffle, [plan_id=XXX]
+- CometFilter [a5#XXX, b5#XXX, c5#XXX, d5#XXX, e5#XXX, f5#XXX],
isnotnull(c5#XXX)
+- CometScan parquet
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX]
Batched: true, DataFilters: [isnotnull(c5#XXX)], Format: CometParquet,
Location: InMemoryFileIndex(14
paths)[file:///.../table1/..., PartitionFilters: [isnotnull(f5#XXX),
dynamicpruningexpression(true)], PushedFilters: [IsNotNull(c5)], ReadSchema:
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
```
We can see that `CometColumnarToRow` is top-level to `CometExchange`, which
is an instance of type `ShuffleExchangeLike`. Ideally, the top-level operator
should `CometExchange` but it is not.
Looks like this case needs to handle this case.
Not sure, but it is possible that there might be relation between this and
#1798 issue.
cc @andygrove
### Steps to reproduce
Followed the steps mentioned here
https://datafusion.apache.org/comet/contributor-guide/spark-sql-tests.html
Enabled and ran the following tests in `InjectRuntimeFilterSuite.scala`
- Runtime bloom filter join: do not add bloom filter if dpp filter exists on
the same column
- Runtime bloom filter join: add bloom filter if dpp filter exists on a
different column
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]