andygrove commented on PR #4519:
URL: 
https://github.com/apache/datafusion-comet/pull/4519#issuecomment-4802446023

   I pulled the branch and wrote a couple of tests to pin down a stage-boundary 
concern. In non-AQE mode a `GROUP BY` plans as `partial-agg -> 
CometExchange(CometNativeShuffle) -> final-agg`, two stages. When the result 
stage exceeds the threshold, `applyForNonAQE` calls 
`revertStageIfNeeded(withRevertedStages, outputColumnar = false)` which runs 
`revertToSpark` over the whole tree. `revertToSpark` (and `insertTransitions`) 
do not stop at the `ShuffleExchangeLike` boundary that `countTransitions` 
carefully respects, so they revert the partial aggregate that lives below the 
native shuffle, in a separate stage that had zero transitions.
   
   The result is an invalid plan. The native `CometExchange` is columnar and 
requires columnar input, but after the revert it is fed a row-based 
`HashAggregate` with no `RowToColumnarExec`. `insertTransitions` does not 
repair it because it only inserts C2R and skips the shuffle (the shuffle's 
`supportsColumnar` is true).
   
   Before revert (valid):
   
   ```
   HashAggregate(final)
   +- ColumnarToRow
      +- CometExchange (CometNativeShuffle)
         +- CometHashAggregate(partial)
            +- CometScan parquet
   ```
   
   After `revertToSpark` (invalid, native shuffle now has a row child):
   
   ```
   HashAggregate(final)
   +- ColumnarToRow
      +- CometExchange (CometNativeShuffle)
         +- HashAggregate(partial)        <- reverted to row, no 
RowToColumnarExec
            +- ColumnarToRow
               +- FileScan parquet
   ```
   
   It reproduces both at the `revertToSpark` unit level and through the real 
`rule.apply` path in non-AQE at threshold 0. Both tests below fail on the 
current branch. The fix direction would be to make `revertToSpark` and 
`insertTransitions` honor the same stage boundaries as `countTransitions` (stop 
at `QueryStageExec`, `ShuffleExchangeLike`, `BroadcastExchangeLike`) so a 
stage-scoped revert stays inside its stage. The AQE result-stage `case _` pass 
has the same whole-subtree traversal, so it is worth checking there too.
   
   Suggested tests to add (they pass once the revert respects stage boundaries):
   
   ```scala
   // add import: 
org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
   
   /**
    * Returns every node that produces a columnar output but consumes a 
row-based child without a
    * RowToColumnar transition. Such a node is an invalid columnar/row 
boundary: a columnar parent
    * (e.g. a native CometShuffleExchangeExec) requires columnar input. 
RowToColumnarExec and
    * CometSparkToColumnarExec are the legitimate row->columnar bridges and are 
excluded.
    */
   private def invalidColumnarBoundaries(plan: SparkPlan): Seq[SparkPlan] = {
     plan.collect {
       case n
           if n.supportsColumnar && !n.isInstanceOf[RowToColumnarTransition] &&
             n.children.exists(c => !c.supportsColumnar) =>
         n
     }
   }
   
   test("revertToSpark must not revert native operators across a shuffle stage 
boundary") {
     withSQLConf("spark.sql.adaptive.enabled" -> "false") {
       withParquetTable((0 until 100).map(i => (i, i % 10)), "tbl") {
         // A GROUP BY produces partial-agg -> native shuffle -> final-agg, 
i.e. two stages.
         val df = sql("SELECT _2, count(*) FROM tbl GROUP BY _2")
         df.collect()
         val cometPlan = stripAQEPlan(df.queryExecution.executedPlan)
   
         val shuffles = cometPlan.collect { case s: CometShuffleExchangeExec => 
s }
         assume(shuffles.nonEmpty, "test requires a native 
CometShuffleExchangeExec")
         assert(
           shuffles.map(s => countCometExecs(s.child)).sum > 0,
           "expected native CometExec operators below the shuffle")
         assert(
           invalidColumnarBoundaries(cometPlan).isEmpty,
           s"precondition: original plan should be 
valid:\n${cometPlan.treeString}")
   
         val rule = RevertNativeForTransitionHeavyStages(spark)
         val reverted = rule.revertToSpark(cometPlan)
   
         val invalid = invalidColumnarBoundaries(reverted)
         assert(
           invalid.isEmpty,
           s"revertToSpark produced invalid columnar/row boundaries " +
             s"(${invalid.map(_.nodeName).mkString(", 
")}):\n${reverted.treeString}")
       }
     }
   }
   
   test("non-AQE apply must not produce an invalid plan when the result stage 
reverts") {
     withSQLConf(
       CometConf.COMET_EXEC_TRANSITION_REVERT_ENABLED.key -> "true",
       // Threshold 0 forces the result stage (above the topmost shuffle) to 
revert.
       CometConf.COMET_EXEC_TRANSITION_REVERT_MAX_TRANSITIONS.key -> "0",
       "spark.sql.adaptive.enabled" -> "false") {
       withParquetTable((0 until 100).map(i => (i, i % 10)), "tbl") {
         val cometPlan = withSQLConf(
           CometConf.COMET_EXEC_TRANSITION_REVERT_ENABLED.key -> "false") {
           val df = sql("SELECT _2, count(*) FROM tbl GROUP BY _2")
           df.collect()
           stripAQEPlan(df.queryExecution.executedPlan)
         }
         assume(
           cometPlan.collect { case s: CometShuffleExchangeExec => s }.nonEmpty,
           "test requires a native CometShuffleExchangeExec")
   
         val rule = RevertNativeForTransitionHeavyStages(spark)
         val result = rule.apply(cometPlan)
   
         val invalid = invalidColumnarBoundaries(result)
         assert(
           invalid.isEmpty,
           s"rule.apply produced invalid columnar/row boundaries " +
             s"(${invalid.map(_.nodeName).mkString(", 
")}):\n${result.treeString}")
       }
     }
   }
   ```
   
   Disclosure: I investigated this and drafted this comment with help from AI 
(Claude Code).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to