This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ede2144 chore: Add config for enabling SMJ with join condition (#937)
4ede2144 is described below

commit 4ede2144316eccbe562078066ef9be8ca1deeeae
Author: Andy Grove <[email protected]>
AuthorDate: Mon Sep 16 11:31:54 2024 -0600

    chore: Add config for enabling SMJ with join condition (#937)
    
    * Add config for enabling SMJ with join condition
    
    * Update common/src/main/scala/org/apache/comet/CometConf.scala
    
    Co-authored-by: Oleks V <[email protected]>
    
    * Update docs/source/user-guide/configs.md
    
    Co-authored-by: Oleks V <[email protected]>
    
    * enable config in stability suite
    
    ---------
    
    Co-authored-by: Oleks V <[email protected]>
---
 common/src/main/scala/org/apache/comet/CometConf.scala             | 6 ++++++
 docs/source/user-guide/configs.md                                  | 1 +
 spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala   | 7 +++++++
 spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala    | 1 +
 spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala      | 1 +
 .../scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala | 1 +
 6 files changed, 17 insertions(+)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 8828b70f..03b7a2a4 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -145,6 +145,12 @@ object CometConf extends ShimCometConf {
   val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
     createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
 
+  val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: 
ConfigEntry[Boolean] =
+    conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
+      .doc("Experimental support for Sort Merge Join with filter")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
     createExecEnabledConfig(
       "stddev",
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 1b5fe736..ff2db342 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -54,6 +54,7 @@ Comet provides the following configuration settings.
 | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. 
Note that this requires setting 'spark.shuffle.manager' to 
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 
'spark.shuffle.manager' must be set before starting the Spark application and 
cannot be changed during the application. | true |
 | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
 | spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by 
default. | true |
+| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support 
for Sort Merge Join with filter | false |
 | spark.comet.exec.stddev.enabled | Whether to enable stddev by default. 
stddev is slower than Spark's implementation. | true |
 | spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable 
takeOrderedAndProject by default. | true |
 | spark.comet.exec.union.enabled | Whether to enable union by default. | true |
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index dbc3a1d8..50d92165 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2961,6 +2961,13 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
           }
         }
 
+        if (join.condition.isDefined &&
+          !CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED
+            .get(conf)) {
+          withInfo(join, join.condition.get)
+          return None
+        }
+
         val condition = join.condition.map { cond =>
           val condProto = exprToProto(cond, join.left.output ++ 
join.right.output)
           if (condProto.isEmpty) {
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 1bd0e1b7..d787a9b1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -340,6 +340,7 @@ class CometJoinSuite extends CometTestBase {
 
   test("SortMergeJoin with join filter") {
     withSQLConf(
+      CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> 
"true",
       SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index d49095e2..1709cce6 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -80,6 +80,7 @@ abstract class CometTestBase
     conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
     conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
     conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
+    
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, 
"true")
     conf
   }
 
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 83cc8982..a553e61c 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -262,6 +262,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
       CometConf.COMET_EXEC_ENABLED.key -> "true",
       CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
       CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> 
"true",
       CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for 
v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
       "spark.sql.readSideCharPadding" -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to