advancedxy commented on code in PR #100:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/100#discussion_r1503833704


##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -451,6 +468,23 @@ class CometSparkSessionExtensions
       }
     }
   }
+
+  // CometExec already wraps a `ColumnarToRowExec` for row-based operators. 
Therefore,
+  // `ColumnarToRowExec` is redundant and can be eliminated.
+  //
+  // It was added during ApplyColumnarRulesAndInsertTransitions' 
insertTransitions phase when Spark
+  // requests row-based output such as `collect` call. It's correct to add a 
redundant
+  // `ColumnarToRowExec` for `CometExec`. However, for certain operators such 
as
+  // `CometCollectLimitExec` which overrides `executeCollect`, the redundant 
`ColumnarToRowExec`
+  // makes the override ineffective. The purpose of this rule is to eliminate 
the redundant
+  // `ColumnarToRowExec` for such operators.
+  case class EliminateRedundantColumnarToRow(session: SparkSession) extends 
Rule[SparkPlan] {

Review Comment:
   Yea, I didn't add a test case for this part. Like noted in the comment, it's 
correct to add or remove the `ColumnarToRowExec` on top of a `CometExec`.
   
   `CollectLimitExec`'s `executeCollect` is optimized by using `executeTake` to 
take rows from child operator. Unlike `CollectLimitExec.doExecute()` or 
`TakeOrderedAndProjectExec.doExecute()`, which would shuffle all the data into 
a single partition and then get the limited data from shuffled partition, 
`executeTake` will retrieves rows directly from child's RDD without shuffle by 
partitions. 
   
   Take the following  code for an example: `sql("select * from 
a_very_large_table limit 100").collect()`. `CollectLimitExec`'s 
`executeCollect` will try to get the first 100 rows in the first partition, 
then the next 2 partitions   if the previous partition doesn't contains 100 
rows, then the next 4 partitions .... **without shuffle**. 
   
   I modeled this behavior(see 
https://github.com/apache/arrow-datafusion-comet/pull/100/files#diff-50c88b1d9b68e7ba24cb6fad9a4f20ea1b8fa63c3c868578db151b83182c627fR57)
 in `CometCollectLimitExec` as well. However, without this rule,  an additional 
`ColumnarToExec` operator is wrapped on top of `CometCollectLimitExec`, which 
makes the override ineffective.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to