This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new aacf45cc minor: use defaults instead of hard-coding values (#1060)
aacf45cc is described below

commit aacf45cc250074a968d83f3a9d91a3551eecb260
Author: Andy Grove <[email protected]>
AuthorDate: Thu Nov 7 09:19:05 2024 -0700

    minor: use defaults instead of hard-coding values (#1060)
    
    * minor: use defaults instead of hard-coding values
    
    * remove file
---
 .../main/scala/org/apache/comet/CometConf.scala    | 77 +++++++++++-----------
 .../apache/comet/CometSparkSessionExtensions.scala | 10 ++-
 2 files changed, 45 insertions(+), 42 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 6833aeff..7450d27a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -73,7 +73,7 @@ object CometConf extends ShimCometConf {
       "Whether to enable native scans. When this is turned on, Spark will use 
Comet to " +
         "read supported data sources (currently only Parquet is supported 
natively). Note " +
         "that to enable native vectorized execution, both this config and " +
-        "'spark.comet.exec.enabled' need to be enabled. By default, this 
config is true.")
+        "'spark.comet.exec.enabled' need to be enabled.")
     .booleanConf
     .createWithDefault(true)
 
@@ -82,7 +82,7 @@ object CometConf extends ShimCometConf {
       .doc(
         "Whether to enable Comet's parallel reader for Parquet files. The 
parallel reader reads " +
           "ranges of consecutive data in a  file in parallel. It is faster for 
large files and " +
-          "row groups but uses more resources. The parallel reader is enabled 
by default.")
+          "row groups but uses more resources.")
       .booleanConf
       .createWithDefault(true)
 
@@ -98,7 +98,7 @@ object CometConf extends ShimCometConf {
       .doc(
         "When enabled the parallel reader will try to merge ranges of data 
that are separated " +
           "by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. 
Longer continuous reads " +
-          "are faster on cloud storage. The default behavior is to merge 
consecutive ranges.")
+          "are faster on cloud storage.")
       .booleanConf
       .createWithDefault(true)
 
@@ -115,7 +115,7 @@ object CometConf extends ShimCometConf {
       .doc("In the parallel reader, if the read ranges submitted are skewed in 
sizes, this " +
         "option will cause the reader to break up larger read ranges into 
smaller ranges to " +
         "reduce the skew. This will result in a slightly larger number of 
connections opened to " +
-        "the file system but may give improved performance. The option is off 
by default.")
+        "the file system but may give improved performance.")
       .booleanConf
       .createWithDefault(false)
 
@@ -153,7 +153,7 @@ object CometConf extends ShimCometConf {
         "native space. Note: each operator is associated with a separate 
config in the " +
         "format of 'spark.comet.exec.<operator_name>.enabled' at the moment, 
and both the " +
         "config and this need to be turned on, in order for the operator to be 
executed in " +
-        "native. By default, this config is true.")
+        "native.")
     .booleanConf
     .createWithDefault(true)
 
@@ -215,7 +215,7 @@ object CometConf extends ShimCometConf {
     "spark.comet.memory.overhead.factor")
     .doc(
       "Fraction of executor memory to be allocated as additional non-heap 
memory per executor " +
-        "process for Comet. Default value is 0.2.")
+        "process for Comet.")
     .doubleConf
     .checkValue(
       factor => factor > 0,
@@ -247,8 +247,7 @@ object CometConf extends ShimCometConf {
       "is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
       "'native' is for native shuffle which has best performance in general. " 
+
       "'jvm' is for jvm-based columnar shuffle which has higher coverage than 
native shuffle. " +
-      "'auto' is for Comet to choose the best shuffle mode based on the query 
plan. " +
-      "By default, this config is 'auto'.")
+      "'auto' is for Comet to choose the best shuffle mode based on the query 
plan.")
     .internal()
     .stringConf
     .transform(_.toLowerCase(Locale.ROOT))
@@ -258,8 +257,8 @@ object CometConf extends ShimCometConf {
   val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
     conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
       .doc(
-        "Whether to force enabling broadcasting for Comet native operators. By 
default, " +
-          "this config is false. Comet broadcast feature will be enabled 
automatically by " +
+        "Whether to force enabling broadcasting for Comet native operators. " +
+          "Comet broadcast feature will be enabled automatically by " +
           "Comet extension. But for unit tests, we need this feature to force 
enabling it " +
           "for invalid cases. So this config is only used for unit test.")
       .internal()
@@ -280,27 +279,26 @@ object CometConf extends ShimCometConf {
     .stringConf
     .createWithDefault("zstd")
 
-  val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
-    "spark.comet.columnar.shuffle.async.enabled")
-    .doc(
-      "Whether to enable asynchronous shuffle for Arrow-based shuffle. By 
default, this config " +
-        "is false.")
-    .booleanConf
-    .createWithDefault(false)
+  val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.columnar.shuffle.async.enabled")
+      .doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
+      .booleanConf
+      .createWithDefault(false)
 
   val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
     conf("spark.comet.columnar.shuffle.async.thread.num")
-      .doc("Number of threads used for Comet async columnar shuffle per 
shuffle task. " +
-        "By default, this config is 3. Note that more threads means more 
memory requirement to " +
-        "buffer shuffle data before flushing to disk. Also, more threads may 
not always " +
-        "improve performance, and should be set based on the number of cores 
available.")
+      .doc(
+        "Number of threads used for Comet async columnar shuffle per shuffle 
task. " +
+          "Note that more threads means more memory requirement to " +
+          "buffer shuffle data before flushing to disk. Also, more threads may 
not always " +
+          "improve performance, and should be set based on the number of cores 
available.")
       .intConf
       .createWithDefault(3)
 
   val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
     conf("spark.comet.columnar.shuffle.async.max.thread.num")
       .doc("Maximum number of threads on an executor used for Comet async 
columnar shuffle. " +
-        "By default, this config is 100. This is the upper bound of total 
number of shuffle " +
+        "This is the upper bound of total number of shuffle " +
         "threads per executor. In other words, if the number of cores * the 
number of shuffle " +
         "threads per task `spark.comet.columnar.shuffle.async.thread.num` is 
larger than " +
         "this config. Comet will use this config as the number of shuffle 
threads per " +
@@ -317,8 +315,7 @@ object CometConf extends ShimCometConf {
           "Higher value means more memory requirement to buffer shuffle data 
before " +
           "flushing to disk. As Comet uses columnar shuffle which is columnar 
format, " +
           "higher value usually helps to improve shuffle data compression 
ratio. This is " +
-          "internal config for testing purpose or advanced tuning. By default, 
" +
-          "this config is Int.Max.")
+          "internal config for testing purpose or advanced tuning.")
       .internal()
       .intConf
       .createWithDefault(Int.MaxValue)
@@ -341,8 +338,7 @@ object CometConf extends ShimCometConf {
       .doc(
         "Fraction of Comet memory to be allocated per executor process for 
Comet shuffle. " +
           "Comet memory size is specified by `spark.comet.memoryOverhead` or " 
+
-          "calculated by `spark.comet.memory.overhead.factor` * 
`spark.executor.memory`. " +
-          "By default, this config is 1.0.")
+          "calculated by `spark.comet.memory.overhead.factor` * 
`spark.executor.memory`.")
       .doubleConf
       .checkValue(
         factor => factor > 0,
@@ -360,11 +356,12 @@ object CometConf extends ShimCometConf {
 
   val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
     "spark.comet.shuffle.preferDictionary.ratio")
-    .doc("The ratio of total values to distinct values in a string column to 
decide whether to " +
-      "prefer dictionary encoding when shuffling the column. If the ratio is 
higher than " +
-      "this config, dictionary encoding will be used on shuffling string 
column. This config " +
-      "is effective if it is higher than 1.0. By default, this config is 10.0. 
Note that this " +
-      "config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
+    .doc(
+      "The ratio of total values to distinct values in a string column to 
decide whether to " +
+        "prefer dictionary encoding when shuffling the column. If the ratio is 
higher than " +
+        "this config, dictionary encoding will be used on shuffling string 
column. This config " +
+        "is effective if it is higher than 1.0. Note that this " +
+        "config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
     .doubleConf
     .createWithDefault(10.0)
 
@@ -377,7 +374,7 @@ object CometConf extends ShimCometConf {
   val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.debug.enabled")
       .doc(
-        "Whether to enable debug mode for Comet. By default, this config is 
false. " +
+        "Whether to enable debug mode for Comet. " +
           "When enabled, Comet will do additional checks for debugging 
purpose. For example, " +
           "validating array when importing arrays from JVM at native side. 
Note that these " +
           "checks may be expensive in performance and should only be enabled 
for debugging " +
@@ -437,19 +434,19 @@ object CometConf extends ShimCometConf {
       "The fraction of memory from Comet memory overhead that the native 
memory " +
         "manager can use for execution. The purpose of this config is to set 
aside memory for " +
         "untracked data structures, as well as imprecise size estimation 
during memory " +
-        "acquisition. Default value is 0.7.")
+        "acquisition.")
     .doubleConf
     .createWithDefault(0.7)
 
-  val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf(
-    "spark.comet.parquet.enable.directBuffer")
-    .doc("Whether to use Java direct byte buffer when reading Parquet. By 
default, this is false")
-    .booleanConf
-    .createWithDefault(false)
+  val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
+    conf("spark.comet.parquet.enable.directBuffer")
+      .doc("Whether to use Java direct byte buffer when reading Parquet.")
+      .booleanConf
+      .createWithDefault(false)
 
   val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.scan.preFetch.enabled")
-      .doc("Whether to enable pre-fetching feature of CometScan. By default is 
disabled.")
+      .doc("Whether to enable pre-fetching feature of CometScan.")
       .booleanConf
       .createWithDefault(false)
 
@@ -457,7 +454,7 @@ object CometConf extends ShimCometConf {
     conf("spark.comet.scan.preFetch.threadNum")
       .doc(
         "The number of threads running pre-fetching for CometScan. Effective 
if " +
-          s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. By default it is 2. 
Note that more " +
+          s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
           "pre-fetching threads means more memory requirement to store 
pre-fetched row groups.")
       .intConf
       .createWithDefault(2)
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index c1d63299..952ef39e 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1275,8 +1275,14 @@ object CometSparkSessionExtensions extends Logging {
       .byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), 
ByteUnit.MiB)
 
     val minimum = ConfigHelpers
-      .byteFromString(sparkConf.get(COMET_MEMORY_OVERHEAD_MIN_MIB.key, "384"), 
ByteUnit.MiB)
-    val overheadFactor = sparkConf.getDouble(COMET_MEMORY_OVERHEAD_FACTOR.key, 
0.2)
+      .byteFromString(
+        sparkConf.get(
+          COMET_MEMORY_OVERHEAD_MIN_MIB.key,
+          COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString),
+        ByteUnit.MiB)
+    val overheadFactor = sparkConf.getDouble(
+      COMET_MEMORY_OVERHEAD_FACTOR.key,
+      COMET_MEMORY_OVERHEAD_FACTOR.defaultValue.get)
 
     val overHeadMemFromConf = sparkConf
       .getOption(COMET_MEMORY_OVERHEAD.key)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to