marin-ma commented on code in PR #10991:
URL:
https://github.com/apache/incubator-gluten/pull/10991#discussion_r2486018463
##########
backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala:
##########
@@ -30,30 +31,50 @@ import
org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQuery
*/
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan]
{
override def apply(plan: SparkPlan): SparkPlan = {
+ val resizeBatchesShuffleInputEnabled =
VeloxConfig.get.veloxResizeBatchesShuffleInput
+ val resizeBatchesShuffleOutputEnabled =
VeloxConfig.get.veloxResizeBatchesShuffleOutput
+ if (!resizeBatchesShuffleInputEnabled &&
!resizeBatchesShuffleOutputEnabled) {
+ return plan
+ }
+
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
plan.transformUp {
- case shuffle: ColumnarShuffleExchangeExec
- if shuffle.shuffleWriterType == HashShuffleWriterType &&
- VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+ case ColumnarResizeableShuffleExchangeExec(shuffle) if
resizeBatchesShuffleInputEnabled =>
Review Comment:
It seems this change assumes a specific shuffle writer type requires both
shuffle input and output require resizing at the same time, but I don't think
that's necessarily true. We may assume for a shuffle type it may need input
resizing but not output resizing, and vice versa. (Although for the existing
types they are the same).
Maybe we can generalize this by adding these flags to the ShuffleWriterType
trait:
```
trait ShuffleWriterType {
val name: String
val requiresResizingShuffleInput: Boolean
val requiresResizingShuffleOutput: Boolean
}
```
What do you think?
##########
backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala:
##########
@@ -30,30 +31,50 @@ import
org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQuery
*/
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan]
{
override def apply(plan: SparkPlan): SparkPlan = {
+ val resizeBatchesShuffleInputEnabled =
VeloxConfig.get.veloxResizeBatchesShuffleInput
+ val resizeBatchesShuffleOutputEnabled =
VeloxConfig.get.veloxResizeBatchesShuffleOutput
+ if (!resizeBatchesShuffleInputEnabled &&
!resizeBatchesShuffleOutputEnabled) {
+ return plan
+ }
+
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
plan.transformUp {
- case shuffle: ColumnarShuffleExchangeExec
- if shuffle.shuffleWriterType == HashShuffleWriterType &&
- VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+ case ColumnarResizeableShuffleExchangeExec(shuffle) if
resizeBatchesShuffleInputEnabled =>
val appendBatches =
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
shuffle.withNewChildren(Seq(appendBatches))
- case a @ AQEShuffleReadExec(ShuffleQueryStageExec(_, _:
ColumnarShuffleExchangeExec, _), _)
- if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ case a @ AQEShuffleReadExec(
+ ShuffleQueryStageExec(_, ColumnarResizeableShuffleExchangeExec(_),
_),
+ _) if resizeBatchesShuffleOutputEnabled =>
VeloxResizeBatchesExec(a, range.min, range.max)
// Since it's transformed in a bottom to up order, so we may first
encountered
// ShuffeQueryStageExec, which is transformed to
VeloxResizeBatchesExec(ShuffeQueryStageExec),
// then we see AQEShuffleReadExec
case a @ AQEShuffleReadExec(
VeloxResizeBatchesExec(
- s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _),
+ s @ ShuffleQueryStageExec(_,
ColumnarResizeableShuffleExchangeExec(_), _),
_,
_),
- _) if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ _) if resizeBatchesShuffleOutputEnabled =>
VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
- case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _)
- if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ case s @ ShuffleQueryStageExec(_,
ColumnarResizeableShuffleExchangeExec(_), _)
+ if resizeBatchesShuffleOutputEnabled =>
VeloxResizeBatchesExec(s, range.min, range.max)
}
}
+
+ private object ColumnarResizeableShuffleExchangeExec {
+ def unapply(plan: SparkPlan): Option[ColumnarShuffleExchangeExec] = {
+ plan match {
+ // sort-based/rss-sort shuffle has already resized the batch size in
shuffle read,
+ // so no need to resize again
+ case shuffle: ColumnarShuffleExchangeExec
Review Comment:
nit: Perhaps a more readable way for this logic
```
case shuffle: ColumnarShuffleExchangeExec
if shuffle.shuffleWriterType != SortShuffleWriterType
&& shuffle.shuffleWriterType != RssSortShuffleWriterType =>
Some(shuffle)
case _ => None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]