sunchao commented on code in PR #178:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/178#discussion_r1526914198


##########
dev/diffs/3.4.2.diff:
##########
@@ -976,7 +978,35 @@ index 266bb343526..85ec36db996 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -461,18 +471,29 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -451,28 +461,53 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+         val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
+           val executedPlan =
+             
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+-          assert(executedPlan.isInstanceOf[SortMergeJoinExec])
+-          executedPlan.asInstanceOf[SortMergeJoinExec]
++          executedPlan match {
++            case s: SortMergeJoinExec => s
++            case b: CometExec =>

Review Comment:
   why we can't check `CometSortMergeJoinExec` here?



##########
spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala:
##########
@@ -58,6 +58,53 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  // TODO: Add a test for SortMergeJoin with join filter after new DataFusion 
release
+  test("SortMergeJoin without join filter") {

Review Comment:
   I think we should probably have a `CometJoinSuite` dedicated for join tests



##########
core/src/execution/datafusion/planner.rs:
##########
@@ -1026,6 +1106,16 @@ impl From<ExpressionError> for DataFusionError {
     }
 }
 
+/// Returns true if given operator can return input array as output array 
without
+/// modification. This is used to determine if we need to copy the input batch 
to avoid
+/// data corruption from reusing the input batch.
+fn op_reuse_array(op: &Arc<dyn ExecutionPlan>) -> bool {

Review Comment:
   nit: I wonder if we can call this `can_reuse_input_batch`



##########
core/src/execution/datafusion/planner.rs:
##########
@@ -858,6 +859,85 @@ impl PhysicalPlanner {
                     Arc::new(CometExpandExec::new(projections, child, schema)),
                 ))
             }
+            OpStruct::SortMergeJoin(join) => {
+                assert!(children.len() == 2);
+                let (mut left_scans, left) = self.create_plan(&children[0], 
inputs)?;
+                let (mut right_scans, right) = self.create_plan(&children[1], 
inputs)?;
+
+                left_scans.append(&mut right_scans);
+
+                let left_join_exprs = join
+                    .left_join_keys
+                    .iter()
+                    .map(|expr| self.create_expr(expr, left.schema()))
+                    .collect::<Result<Vec<_>, _>>()?;
+                let right_join_exprs = join
+                    .right_join_keys
+                    .iter()
+                    .map(|expr| self.create_expr(expr, right.schema()))
+                    .collect::<Result<Vec<_>, _>>()?;
+
+                let join_on = left_join_exprs
+                    .into_iter()
+                    .zip(right_join_exprs)
+                    .collect::<Vec<_>>();
+
+                let join_type = match join.join_type.try_into() {
+                    Ok(JoinType::Inner) => DFJoinType::Inner,
+                    Ok(JoinType::LeftOuter) => DFJoinType::Left,
+                    Ok(JoinType::RightOuter) => DFJoinType::Right,
+                    Ok(JoinType::FullOuter) => DFJoinType::Full,
+                    Ok(JoinType::LeftSemi) => DFJoinType::LeftSemi,
+                    Ok(JoinType::RightSemi) => DFJoinType::RightSemi,
+                    Ok(JoinType::LeftAnti) => DFJoinType::LeftAnti,
+                    Ok(JoinType::RightAnti) => DFJoinType::RightAnti,
+                    Err(_) => {
+                        return Err(ExecutionError::GeneralError(format!(
+                            "Unsupported join type: {:?}",
+                            join.join_type
+                        )));
+                    }
+                };
+
+                let sort_options = join
+                    .sort_options
+                    .iter()
+                    .map(|sort_option| {
+                        let sort_expr = self.create_sort_expr(sort_option, 
left.schema()).unwrap();
+                        SortOptions {
+                            descending: sort_expr.options.descending,
+                            nulls_first: sort_expr.options.nulls_first,
+                        }
+                    })
+                    .collect();
+
+                // DataFusion `SortMergeJoinExec` operator keeps the input 
batch internally. We need
+                // to copy the input batch to avoid the data corruption from 
reusing the input
+                // batch.
+                let left = if op_reuse_array(&left) {
+                    Arc::new(CopyExec::new(left))
+                } else {
+                    left
+                };
+
+                let right = if op_reuse_array(&right) {
+                    Arc::new(CopyExec::new(right))
+                } else {
+                    right
+                };
+
+                let join = Arc::new(SortMergeJoinExec::try_new(
+                    left,
+                    right,
+                    join_on,
+                    None,
+                    join_type,
+                    sort_options,
+                    false,

Review Comment:
   nit: could we add some comments on why this is set to false?



##########
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##########
@@ -1838,6 +1840,62 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           }
         }
 
+      case join: SortMergeJoinExec if isCometOperatorEnabled(op.conf, 
"sort_merge_join") =>
+        // `requiredOrders` and `getKeyOrdering` are copied from Spark's 
SortMergeJoinExec.
+        def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
+          keys.map(SortOrder(_, Ascending))
+        }
+
+        def getKeyOrdering(

Review Comment:
   curious why we can't use `outputOrdering` from the join



##########
common/src/main/java/org/apache/comet/vector/CometPlainVector.java:
##########
@@ -113,7 +113,11 @@ public UTF8String getUTF8String(int rowId) {
       byte[] result = new byte[length];
       Platform.copyMemory(
           null, valueBufferAddress + offset, result, 
Platform.BYTE_ARRAY_OFFSET, length);
-      return UTF8String.fromString(convertToUuid(result).toString());
+      if (length == 16) {
+        return UTF8String.fromString(convertToUuid(result).toString());
+      } else {
+        return UTF8String.fromBytes(result);
+      }

Review Comment:
   Since Iceberg support is still pending, should we replace this `else` branch 
with a general handling of FLBA type to `UTF8String`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to