sunchao commented on code in PR #186:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/186#discussion_r1522628382
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -222,12 +222,16 @@ class CometSparkSessionExtensions
*/
// spotless:on
private def transform(plan: SparkPlan): SparkPlan = {
- def transform1(op: UnaryExecNode): Option[Operator] = {
- op.child match {
- case childNativeOp: CometNativeExec =>
- QueryPlanSerde.operator2Proto(op, childNativeOp.nativeOp)
- case _ =>
- None
+ def transform1(op: SparkPlan): Option[Operator] = {
Review Comment:
curious that whether this change is related to this PR
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -378,6 +382,31 @@ class CometSparkSessionExtensions
case None => b
}
+ // For AQE shuffle stage on a Comet shuffle exchange
+ case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
+ val newOp = transform1(s)
+ newOp match {
+ case Some(nativeOp) =>
+ CometSinkPlaceHolder(nativeOp, s, s)
+ case None =>
+ s
+ }
+
+ // For AQE shuffle stage on a reused Comet shuffle exchange
+ // Note that we don't need to handle `ReusedExchangeExec` for non-AQE
case, because
+ // the query plan won't be re-optimized/planned in non-AQE mode.
+ case s @ ShuffleQueryStageExec(
Review Comment:
curious if we can replace this rule with:
```scala
case ReusedExchangeExec(_, op) => op
```
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]