kazuyukitanimura commented on code in PR #178:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/178#discussion_r1522632265
##########
core/src/execution/datafusion/planner.rs:
##########
@@ -1026,6 +1106,15 @@ impl From<ExpressionError> for DataFusionError {
}
}
+/// Returns true if given operator probably returns input array as output
array without
Review Comment:
nit: `probably returns` -> `can return` ?
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -222,12 +222,16 @@ class CometSparkSessionExtensions
*/
// spotless:on
private def transform(plan: SparkPlan): SparkPlan = {
- def transform1(op: UnaryExecNode): Option[Operator] = {
- op.child match {
- case childNativeOp: CometNativeExec =>
- QueryPlanSerde.operator2Proto(op, childNativeOp.nativeOp)
- case _ =>
- None
+ def transform1(op: SparkPlan): Option[Operator] = {
+ val allNativeExec = op.children.map {
+ case childNativeOp: CometNativeExec => Some(childNativeOp.nativeOp)
+ case _ => None
+ }
+
+ if (allNativeExec.forall(_.isDefined)) {
+ QueryPlanSerde.operator2Proto(op, allNativeExec.map(_.get): _*)
+ } else {
+ None
Review Comment:
nit: I feel this can become a little easier if we do
```
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
ueryPlanSerde.operator2Proto(op,
op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
} else {
None
}
```
##########
spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala:
##########
@@ -58,6 +58,53 @@ class CometExecSuite extends CometTestBase {
}
}
+ // TODO: Add a test for SortMergeJoin with join filter after new DataFusion
release
+ test("SortMergeJoin without join filter") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
+ withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
+ val df1 = sql("SELECT * FROM tbl_a JOIN tbl_b ON tbl_a._2 =
tbl_b._1")
+ checkSparkAnswerAndOperator(df1)
Review Comment:
Should we add checks to make sure the plan includes the Comet SMJ, i.e.
`stripAQEPlan(df.queryExecution.executedPlan).collectFirst...`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]