advancedxy commented on code in PR #100:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/100#discussion_r1503833704
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -451,6 +468,23 @@ class CometSparkSessionExtensions
}
}
}
+
+ // CometExec already wraps a `ColumnarToRowExec` for row-based operators.
Therefore,
+ // `ColumnarToRowExec` is redundant and can be eliminated.
+ //
+ // It was added during ApplyColumnarRulesAndInsertTransitions'
insertTransitions phase when Spark
+ // requests row-based output such as `collect` call. It's correct to add a
redundant
+ // `ColumnarToRowExec` for `CometExec`. However, for certain operators such
as
+ // `CometCollectLimitExec` which overrides `executeCollect`, the redundant
`ColumnarToRowExec`
+ // makes the override ineffective. The purpose of this rule is to eliminate
the redundant
+ // `ColumnarToRowExec` for such operators.
+ case class EliminateRedundantColumnarToRow(session: SparkSession) extends
Rule[SparkPlan] {
Review Comment:
Yea, I didn't add a test case for this part. Like noted in the comment, it's
correct to add or remove the `ColumnarToRowExec` on top of a `CometExec`.
`CollectLimitExec`'s `executeCollect` is optimized by using `executeTake` to
take rows from child operator. Unlike `CollectLimitExec.doExecute()` or
`TakeOrderedAndProjectExec.doExecute()`, which would shuffle all the data into
a single partition and then get the limited data from shuffled partition,
`executeTake` will retrieves rows directly from child's RDD without shuffle by
partitions.
Take the following code for an example: `sql("select * from
a_very_large_table limit 100").collect()`. `CollectLimitExec`'s
`executeCollect` will try to get the first 100 rows in the first partition,
then the next 2 partitions if the previous partition doesn't contains 100
rows, then the next 4 partitions .... **without shuffle**.
I modeled this behavior(see
https://github.com/apache/arrow-datafusion-comet/pull/100/files#diff-50c88b1d9b68e7ba24cb6fad9a4f20ea1b8fa63c3c868578db151b83182c627fR57)
in `CometCollectLimitExec` as well. However, without this rule, an additional
`ColumnarToExec` operator is wrapped on top of `CometCollectLimitExec`, which
makes the override ineffective.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]