This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 5430f632 chore: Rename some columnar shuffle configs for code 
consistently (#418)
5430f632 is described below

commit 5430f632f46afb4c2130d717b2c75296218994e7
Author: Xuedong Luan <[email protected]>
AuthorDate: Mon May 13 22:25:03 2024 +0800

    chore: Rename some columnar shuffle configs for code consistently (#418)
---
 common/src/main/scala/org/apache/comet/CometConf.scala              | 6 +++---
 .../org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java   | 4 ++--
 .../comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java  | 2 +-
 .../spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java     | 2 +-
 .../apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java | 3 ++-
 .../spark/sql/comet/execution/shuffle/CometShuffleManager.scala     | 2 +-
 .../scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala     | 6 +++---
 7 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index e9349c31..e3584300 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -186,7 +186,7 @@ object CometConf {
     .booleanConf
     .createWithDefault(false)
 
-  val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
+  val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
     conf("spark.comet.columnar.shuffle.async.thread.num")
       .doc("Number of threads used for Comet async columnar shuffle per 
shuffle task. " +
         "By default, this config is 3. Note that more threads means more 
memory requirement to " +
@@ -195,7 +195,7 @@ object CometConf {
       .intConf
       .createWithDefault(3)
 
-  val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
+  val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
     conf("spark.comet.columnar.shuffle.async.max.thread.num")
       .doc("Maximum number of threads on an executor used for Comet async 
columnar shuffle. " +
         "By default, this config is 100. This is the upper bound of total 
number of shuffle " +
@@ -207,7 +207,7 @@ object CometConf {
       .createWithDefault(100)
   }
 
-  val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
+  val COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
     conf("spark.comet.columnar.shuffle.spill.threshold")
       .doc(
         "Number of rows to be spilled used for Comet columnar shuffle. " +
diff --git 
a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
 
b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
index 4417c4f2..ed3e2be6 100644
--- 
a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
+++ 
b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
@@ -146,7 +146,7 @@ public final class CometShuffleExternalSorter implements 
CometShuffleChecksumSup
     this.numPartitions = numPartitions;
     this.schema = schema;
     this.numElementsForSpillThreshold =
-        (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
+        (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
     this.writeMetrics = writeMetrics;
 
     this.peakMemoryUsedBytes = getMemoryUsage();
@@ -158,7 +158,7 @@ public final class CometShuffleExternalSorter implements 
CometShuffleChecksumSup
     this.isAsync = (boolean) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
 
     if (isAsync) {
-      this.threadNum = (int) 
CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
+      this.threadNum = (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
       assert (this.threadNum > 0);
       this.threadPool = ShuffleThreadPool.getThreadPool();
     } else {
diff --git 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
index 5c17a643..108e1f2e 100644
--- 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
+++ 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
@@ -141,7 +141,7 @@ final class CometBypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V>
     this.partitionChecksums = createPartitionChecksums(numPartitions, conf);
 
     this.isAsync = (boolean) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
-    this.asyncThreadNum = (int) 
CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
+    this.asyncThreadNum = (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
 
     if (isAsync) {
       logger.info("Async shuffle writer enabled");
diff --git 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
index 309fcaf6..f793874d 100644
--- 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
+++ 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
@@ -154,7 +154,7 @@ public final class CometDiskBlockWriter {
     this.columnarBatchSize = (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();
 
     this.numElementsForSpillThreshold =
-        (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
+        (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
 
     this.preferDictionaryRatio =
         (double) 
CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get();
diff --git 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
index 69550e47..86d7d7dc 100644
--- 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
+++ 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
@@ -37,7 +37,8 @@ public class ShuffleThreadPool {
         ThreadFactory factory =
             new 
ThreadFactoryBuilder().setNameFormat("async-shuffle-writer-%d").build();
 
-        int threadNum = (int) 
CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
+        int threadNum =
+            (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
         INSTANCE =
             new ThreadPoolExecutor(
                 0, threadNum, 1L, TimeUnit.SECONDS, new 
ThreadPoolQueue(threadNum), factory);
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index cb342253..335fb065 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -248,7 +248,7 @@ object CometShuffleManager extends Logging {
       // Bypass merge sort if we have partition * cores fewer than
       // `spark.comet.columnar.shuffle.async.max.thread.num`
       val executorCores = conf.get(config.EXECUTOR_CORES)
-      val maxThreads = 
CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
+      val maxThreads = 
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
       val threadCond = dep.partitioner.numPartitions * executorCores <= 
maxThreads
 
       // Comet columnar shuffle buffers rows in memory. If too many cores are 
used with
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 114351fd..600f9c44 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -52,7 +52,7 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
     super.test(testName, testTags: _*) {
       withSQLConf(
         CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> 
asyncShuffleEnable.toString,
-        CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> 
numElementsForceSpillThreshold.toString,
+        CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> 
numElementsForceSpillThreshold.toString,
         CometConf.COMET_EXEC_ENABLED.key -> "false",
         CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
         CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
@@ -634,7 +634,7 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
         CometConf.COMET_BATCH_SIZE.key -> "10",
         CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1",
-        CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
+        CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
         val table1 = (0 until 1000)
           .map(i => (111111.toString, 2222222.toString, 3333333.toString, 
i.toLong))
           .toDF("a", "b", "c", "d")
@@ -1081,7 +1081,7 @@ class CometShuffleEncryptionSuite extends CometTestBase {
 class CometShuffleManagerSuite extends CometTestBase {
 
   test("should not bypass merge sort if executor cores are too high") {
-    withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> 
"100") {
+    withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> 
"100") {
       val conf = new SparkConf()
       conf.set("spark.executor.cores", "1")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to