karuppayya opened a new issue, #4518:
URL: https://github.com/apache/datafusion-comet/issues/4518
### What is the problem the feature request solves?
When Comet converts operators to native columnar execution but several
operators in the stage cannot run natively (UDFs, SortAggregate, unsupported
expressions), the stage accumulates multiple columnar-to-row
(C2R) and row-to-columnar (R2C) transitions. With columnar shuffle
enabled, each C2R has a corresponding R2C — so data repeatedly bounces between
formats.
When many operators in a stage fall back, the conversion overhead of these
round-trips can exceed the benefit of running the few remaining operators
natively.
Currently Comet decides per-operator whether to convert, with no awareness
of how many transitions the stage as a whole accumulates. This proposal
introduces a stage-aware fallback — evaluating the stage after
all operators have been converted and transitions inserted, then reverting
the entire stage if the transition count is too high.
Example — a stage with 3 C2R transitions:
```
CometShuffleExchangeExec
└── CometProject
└── ColumnarToRow
└── SortAggregate (unsupported)
└── RowToColumnar
└── CometHashAggregate
└── ColumnarToRow
└── UDF Filter (unsupported)
└── RowToColumnar
└── CometHashAggregate
└── ColumnarToRow
└──
SortAggregate (unsupported)
└──
QueryStageExec
```
This stage pays 3 C2R + 3 R2C = 6 format conversions while only running 3
operators natively.
### Describe the potential solution
A postColumnarTransitions rule that counts C2R transitions per stage and,
if the count exceeds a configurable threshold, reverts the stage to Spark
row-based execution using each CometExec.originalPlan. A single
RowToColumnarExec is inserted at the output to feed the columnar shuffle.
After revert:
```
CometShuffleExchangeExec
└── RowToColumnarExec
└── ProjectExec
└── SortAggregate
└── HashAggregateExec
└── FilterExec
└── SortAggregate
└── QueryStageExec
```
### Additional context
This is conceptually similar to Spark's
[SimpleCostEvaluator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala#L42)
in AQE, which uses the number of shuffle nodes as a cost signal to decide
whether to accept a re-optimized plan. Here we use the number of C2R
transitions as
a cost signal to decide whether native execution is worthwhile for a given
stage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]