This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c9f79a6 chore: disable xxhash64 by default (#548)
0c9f79a6 is described below

commit 0c9f79a6299ceb1efeec6fc90c68e11c3a156094
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jun 10 13:15:24 2024 -0600

    chore: disable xxhash64 by default (#548)
    
    * disable xxhash64 by default
    
    * fix regressions
---
 .../main/scala/org/apache/comet/CometConf.scala    |  6 ++++
 docs/source/user-guide/configs.md                  |  1 +
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 32 ++++++++++++++--------
 .../org/apache/comet/CometExpressionSuite.scala    |  2 ++
 4 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 42fb5fb4..1b40c7cd 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -401,6 +401,12 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
+  val COMET_XXHASH64_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.xxhash64.enabled")
+      .doc("The xxhash64 implementation is not optimized yet and may cause 
performance issues.")
+      .booleanConf
+      .createWithDefault(false)
+
 }
 
 object ConfigHelpers {
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 104f29ce..f232dc8b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -48,3 +48,4 @@ Comet provides the following configuration settings.
 | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature 
of CometScan. By default is disabled. | false |
 | spark.comet.scan.preFetch.threadNum | The number of threads running 
pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is 
enabled. By default it is 2. Note that more pre-fetching threads means more 
memory requirement to store pre-fetched row groups. | 2 |
 | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to 
distinct values in a string column to decide whether to prefer dictionary 
encoding when shuffling the column. If the ratio is higher than this config, 
dictionary encoding will be used on shuffling string column. This config is 
effective if it is higher than 1.0. By default, this config is 10.0. Note that 
this config is only used when 'spark.comet.columnar.shuffle.enabled' is true. | 
10.0 |
+| spark.comet.xxhash64.enabled | The xxhash64 implementation is not optimized 
yet and may cause performance issues. | false |
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 7d9bef48..5a0ad38d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2101,19 +2101,27 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
           scalarExprToProtoWithReturnType("murmur3_hash", IntegerType, exprs 
:+ seedExpr: _*)
 
         case XxHash64(children, seed) =>
-          val firstUnSupportedInput = children.find(c => 
!supportedDataType(c.dataType))
-          if (firstUnSupportedInput.isDefined) {
-            withInfo(expr, s"Unsupported datatype 
${firstUnSupportedInput.get.dataType}")
-            return None
+          if (CometConf.COMET_XXHASH64_ENABLED.get()) {
+            val firstUnSupportedInput = children.find(c => 
!supportedDataType(c.dataType))
+            if (firstUnSupportedInput.isDefined) {
+              withInfo(expr, s"Unsupported datatype 
${firstUnSupportedInput.get.dataType}")
+              return None
+            }
+            val exprs = children.map(exprToProtoInternal(_, inputs))
+            val seedBuilder = ExprOuterClass.Literal
+              .newBuilder()
+              .setDatatype(serializeDataType(LongType).get)
+              .setLongVal(seed)
+            val seedExpr = 
Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build())
+            // the seed is put at the end of the arguments
+            scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ 
seedExpr: _*)
+          } else {
+            withInfo(
+              expr,
+              "xxhash64 is disabled by default. " +
+                s"Set ${CometConf.COMET_XXHASH64_ENABLED.key}=true to enable 
it.")
+            None
           }
-          val exprs = children.map(exprToProtoInternal(_, inputs))
-          val seedBuilder = ExprOuterClass.Literal
-            .newBuilder()
-            .setDatatype(serializeDataType(LongType).get)
-            .setLongVal(seed)
-          val seedExpr = 
Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build())
-          // the seed is put at the end of the arguments
-          scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ 
seedExpr: _*)
 
         case Sha2(left, numBits) =>
           if (!numBits.foldable) {
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 7e6d2d12..10fbc468 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1506,6 +1506,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     Seq(true, false).foreach { dictionary =>
       withSQLConf(
         "parquet.enable.dictionary" -> dictionary.toString,
+        CometConf.COMET_XXHASH64_ENABLED.key -> "true",
         CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
         val table = "test"
         withTable(table) {
@@ -1538,6 +1539,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     Seq(true, false).foreach { dictionary =>
       withSQLConf(
         "parquet.enable.dictionary" -> dictionary.toString,
+        CometConf.COMET_XXHASH64_ENABLED.key -> "true",
         CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
         val table = "test"
         withTable(table) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to