This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5430f632 chore: Rename some columnar shuffle configs for code
consistently (#418)
5430f632 is described below
commit 5430f632f46afb4c2130d717b2c75296218994e7
Author: Xuedong Luan <[email protected]>
AuthorDate: Mon May 13 22:25:03 2024 +0800
chore: Rename some columnar shuffle configs for code consistently (#418)
---
common/src/main/scala/org/apache/comet/CometConf.scala | 6 +++---
.../org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java | 4 ++--
.../comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java | 2 +-
.../spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java | 2 +-
.../apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java | 3 ++-
.../spark/sql/comet/execution/shuffle/CometShuffleManager.scala | 2 +-
.../scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala | 6 +++---
7 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index e9349c31..e3584300 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -186,7 +186,7 @@ object CometConf {
.booleanConf
.createWithDefault(false)
- val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
+ val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
.doc("Number of threads used for Comet async columnar shuffle per
shuffle task. " +
"By default, this config is 3. Note that more threads means more
memory requirement to " +
@@ -195,7 +195,7 @@ object CometConf {
.intConf
.createWithDefault(3)
- val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
+ val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async
columnar shuffle. " +
"By default, this config is 100. This is the upper bound of total
number of shuffle " +
@@ -207,7 +207,7 @@ object CometConf {
.createWithDefault(100)
}
- val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
+ val COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.spill.threshold")
.doc(
"Number of rows to be spilled used for Comet columnar shuffle. " +
diff --git
a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
index 4417c4f2..ed3e2be6 100644
---
a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
+++
b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java
@@ -146,7 +146,7 @@ public final class CometShuffleExternalSorter implements
CometShuffleChecksumSup
this.numPartitions = numPartitions;
this.schema = schema;
this.numElementsForSpillThreshold =
- (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
+ (int)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
this.writeMetrics = writeMetrics;
this.peakMemoryUsedBytes = getMemoryUsage();
@@ -158,7 +158,7 @@ public final class CometShuffleExternalSorter implements
CometShuffleChecksumSup
this.isAsync = (boolean)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
if (isAsync) {
- this.threadNum = (int)
CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
+ this.threadNum = (int)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
assert (this.threadNum > 0);
this.threadPool = ShuffleThreadPool.getThreadPool();
} else {
diff --git
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
index 5c17a643..108e1f2e 100644
---
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
+++
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java
@@ -141,7 +141,7 @@ final class CometBypassMergeSortShuffleWriter<K, V> extends
ShuffleWriter<K, V>
this.partitionChecksums = createPartitionChecksums(numPartitions, conf);
this.isAsync = (boolean)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
- this.asyncThreadNum = (int)
CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
+ this.asyncThreadNum = (int)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
if (isAsync) {
logger.info("Async shuffle writer enabled");
diff --git
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
index 309fcaf6..f793874d 100644
---
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
+++
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
@@ -154,7 +154,7 @@ public final class CometDiskBlockWriter {
this.columnarBatchSize = (int)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();
this.numElementsForSpillThreshold =
- (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
+ (int)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
this.preferDictionaryRatio =
(double)
CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get();
diff --git
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
index 69550e47..86d7d7dc 100644
---
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
+++
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java
@@ -37,7 +37,8 @@ public class ShuffleThreadPool {
ThreadFactory factory =
new
ThreadFactoryBuilder().setNameFormat("async-shuffle-writer-%d").build();
- int threadNum = (int)
CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
+ int threadNum =
+ (int)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
INSTANCE =
new ThreadPoolExecutor(
0, threadNum, 1L, TimeUnit.SECONDS, new
ThreadPoolQueue(threadNum), factory);
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index cb342253..335fb065 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -248,7 +248,7 @@ object CometShuffleManager extends Logging {
// Bypass merge sort if we have partition * cores fewer than
// `spark.comet.columnar.shuffle.async.max.thread.num`
val executorCores = conf.get(config.EXECUTOR_CORES)
- val maxThreads =
CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
+ val maxThreads =
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
val threadCond = dep.partitioner.numPartitions * executorCores <=
maxThreads
// Comet columnar shuffle buffers rows in memory. If too many cores are
used with
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 114351fd..600f9c44 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -52,7 +52,7 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
super.test(testName, testTags: _*) {
withSQLConf(
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key ->
asyncShuffleEnable.toString,
- CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key ->
numElementsForceSpillThreshold.toString,
+ CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key ->
numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
@@ -634,7 +634,7 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_BATCH_SIZE.key -> "10",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1",
- CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
+ CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
val table1 = (0 until 1000)
.map(i => (111111.toString, 2222222.toString, 3333333.toString,
i.toLong))
.toDF("a", "b", "c", "d")
@@ -1081,7 +1081,7 @@ class CometShuffleEncryptionSuite extends CometTestBase {
class CometShuffleManagerSuite extends CometTestBase {
test("should not bypass merge sort if executor cores are too high") {
- withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.key ->
"100") {
+ withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.key ->
"100") {
val conf = new SparkConf()
conf.set("spark.executor.cores", "1")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]