andygrove commented on code in PR #3989:
URL: https://github.com/apache/datafusion-comet/pull/3989#discussion_r3113173363
##########
spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala:
##########
@@ -223,39 +222,90 @@ object CometShuffleExchangeExec
with SQLConfHelper {
override def getSupportLevel(op: ShuffleExchangeExec): SupportLevel = {
- if (nativeShuffleSupported(op) || columnarShuffleSupported(op)) {
- Compatible()
- } else {
- Unsupported()
- }
+ if (shuffleSupported(op).isDefined) Compatible() else Unsupported()
}
override def createExec(
nativeOp: OperatorOuterClass.Operator,
op: ShuffleExchangeExec): CometNativeExec = {
- if (nativeShuffleSupported(op) &&
op.children.forall(_.isInstanceOf[CometNativeExec])) {
- // Switch to use Decimal128 regardless of precision, since Arrow native
execution
- // doesn't support Decimal32 and Decimal64 yet.
- conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
- CometSinkPlaceHolder(
- nativeOp,
- op,
- CometShuffleExchangeExec(op, shuffleType = CometNativeShuffle))
-
- } else if (columnarShuffleSupported(op)) {
- CometSinkPlaceHolder(
- nativeOp,
- op,
- CometShuffleExchangeExec(op, shuffleType = CometColumnarShuffle))
- } else {
- throw new IllegalStateException()
+ shuffleSupported(op) match {
+ case Some(CometNativeShuffle) if
op.children.forall(_.isInstanceOf[CometNativeExec]) =>
+ // Switch to use Decimal128 regardless of precision, since Arrow
native execution
+ // doesn't support Decimal32 and Decimal64 yet.
+ conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
+ CometSinkPlaceHolder(
+ nativeOp,
+ op,
+ CometShuffleExchangeExec(op, shuffleType = CometNativeShuffle))
+ case Some(CometColumnarShuffle) =>
+ CometSinkPlaceHolder(
+ nativeOp,
+ op,
+ CometShuffleExchangeExec(op, shuffleType = CometColumnarShuffle))
+ case Some(CometNativeShuffle) =>
+ // Native was chosen but children are not native - fall through to
columnar if possible.
+ // This can happen when getSupportLevel selected native but a later
pass changed the plan.
+ throw new IllegalStateException(
+ "shuffleSupported chose native shuffle but children are not all
CometNativeExec")
+ case None =>
+ throw new IllegalStateException()
+ }
+ }
+
+ /**
+ * Decide which Comet shuffle path (if any) can handle this shuffle. Returns
`None` if neither
+ * native nor columnar shuffle can be used; in that case the node is tagged
with the combined
+ * fallback reasons via `withInfos` so subsequent passes short-circuit via
`hasExplainInfo`.
+ *
+ * This is the single coordination point: the two path-specific predicates
+ * (`nativeShuffleFailureReasons` / `columnarShuffleFailureReasons`) are
pure - they return
+ * collected reasons but do not tag. Tagging only happens here, and only on
total failure.
+ */
+ def shuffleSupported(s: ShuffleExchangeExec): Option[ShuffleType] = {
+ // Sticky: a prior rule pass (initial planning or an earlier AQE pass)
already decided this
+ // shuffle falls back to Spark and tagged it. Preserve that decision -
re-deriving it against
+ // a possibly-reshaped subtree (e.g. AQE stage-wrapping) can flip the
answer and produce
+ // inconsistent plans across passes (see #3949).
+ if (hasExplainInfo(s)) return None
Review Comment:
https://github.com/apache/datafusion-comet/issues/4006
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]