This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new aacf45cc minor: use defaults instead of hard-coding values (#1060)
aacf45cc is described below
commit aacf45cc250074a968d83f3a9d91a3551eecb260
Author: Andy Grove <[email protected]>
AuthorDate: Thu Nov 7 09:19:05 2024 -0700
minor: use defaults instead of hard-coding values (#1060)
* minor: use defaults instead of hard-coding values
* remove file
---
.../main/scala/org/apache/comet/CometConf.scala | 77 +++++++++++-----------
.../apache/comet/CometSparkSessionExtensions.scala | 10 ++-
2 files changed, 45 insertions(+), 42 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 6833aeff..7450d27a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -73,7 +73,7 @@ object CometConf extends ShimCometConf {
"Whether to enable native scans. When this is turned on, Spark will use
Comet to " +
"read supported data sources (currently only Parquet is supported
natively). Note " +
"that to enable native vectorized execution, both this config and " +
- "'spark.comet.exec.enabled' need to be enabled. By default, this
config is true.")
+ "'spark.comet.exec.enabled' need to be enabled.")
.booleanConf
.createWithDefault(true)
@@ -82,7 +82,7 @@ object CometConf extends ShimCometConf {
.doc(
"Whether to enable Comet's parallel reader for Parquet files. The
parallel reader reads " +
"ranges of consecutive data in a file in parallel. It is faster for
large files and " +
- "row groups but uses more resources. The parallel reader is enabled
by default.")
+ "row groups but uses more resources.")
.booleanConf
.createWithDefault(true)
@@ -98,7 +98,7 @@ object CometConf extends ShimCometConf {
.doc(
"When enabled the parallel reader will try to merge ranges of data
that are separated " +
"by less than 'comet.parquet.read.io.mergeRanges.delta' bytes.
Longer continuous reads " +
- "are faster on cloud storage. The default behavior is to merge
consecutive ranges.")
+ "are faster on cloud storage.")
.booleanConf
.createWithDefault(true)
@@ -115,7 +115,7 @@ object CometConf extends ShimCometConf {
.doc("In the parallel reader, if the read ranges submitted are skewed in
sizes, this " +
"option will cause the reader to break up larger read ranges into
smaller ranges to " +
"reduce the skew. This will result in a slightly larger number of
connections opened to " +
- "the file system but may give improved performance. The option is off
by default.")
+ "the file system but may give improved performance.")
.booleanConf
.createWithDefault(false)
@@ -153,7 +153,7 @@ object CometConf extends ShimCometConf {
"native space. Note: each operator is associated with a separate
config in the " +
"format of 'spark.comet.exec.<operator_name>.enabled' at the moment,
and both the " +
"config and this need to be turned on, in order for the operator to be
executed in " +
- "native. By default, this config is true.")
+ "native.")
.booleanConf
.createWithDefault(true)
@@ -215,7 +215,7 @@ object CometConf extends ShimCometConf {
"spark.comet.memory.overhead.factor")
.doc(
"Fraction of executor memory to be allocated as additional non-heap
memory per executor " +
- "process for Comet. Default value is 0.2.")
+ "process for Comet.")
.doubleConf
.checkValue(
factor => factor > 0,
@@ -247,8 +247,7 @@ object CometConf extends ShimCometConf {
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
"'native' is for native shuffle which has best performance in general. "
+
"'jvm' is for jvm-based columnar shuffle which has higher coverage than
native shuffle. " +
- "'auto' is for Comet to choose the best shuffle mode based on the query
plan. " +
- "By default, this config is 'auto'.")
+ "'auto' is for Comet to choose the best shuffle mode based on the query
plan.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
@@ -258,8 +257,8 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
- "Whether to force enabling broadcasting for Comet native operators. By
default, " +
- "this config is false. Comet broadcast feature will be enabled
automatically by " +
+ "Whether to force enabling broadcasting for Comet native operators. " +
+ "Comet broadcast feature will be enabled automatically by " +
"Comet extension. But for unit tests, we need this feature to force
enabling it " +
"for invalid cases. So this config is only used for unit test.")
.internal()
@@ -280,27 +279,26 @@ object CometConf extends ShimCometConf {
.stringConf
.createWithDefault("zstd")
- val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
- "spark.comet.columnar.shuffle.async.enabled")
- .doc(
- "Whether to enable asynchronous shuffle for Arrow-based shuffle. By
default, this config " +
- "is false.")
- .booleanConf
- .createWithDefault(false)
+ val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.columnar.shuffle.async.enabled")
+ .doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
+ .booleanConf
+ .createWithDefault(false)
val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
- .doc("Number of threads used for Comet async columnar shuffle per
shuffle task. " +
- "By default, this config is 3. Note that more threads means more
memory requirement to " +
- "buffer shuffle data before flushing to disk. Also, more threads may
not always " +
- "improve performance, and should be set based on the number of cores
available.")
+ .doc(
+ "Number of threads used for Comet async columnar shuffle per shuffle
task. " +
+ "Note that more threads means more memory requirement to " +
+ "buffer shuffle data before flushing to disk. Also, more threads may
not always " +
+ "improve performance, and should be set based on the number of cores
available.")
.intConf
.createWithDefault(3)
val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async
columnar shuffle. " +
- "By default, this config is 100. This is the upper bound of total
number of shuffle " +
+ "This is the upper bound of total number of shuffle " +
"threads per executor. In other words, if the number of cores * the
number of shuffle " +
"threads per task `spark.comet.columnar.shuffle.async.thread.num` is
larger than " +
"this config. Comet will use this config as the number of shuffle
threads per " +
@@ -317,8 +315,7 @@ object CometConf extends ShimCometConf {
"Higher value means more memory requirement to buffer shuffle data
before " +
"flushing to disk. As Comet uses columnar shuffle which is columnar
format, " +
"higher value usually helps to improve shuffle data compression
ratio. This is " +
- "internal config for testing purpose or advanced tuning. By default,
" +
- "this config is Int.Max.")
+ "internal config for testing purpose or advanced tuning.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)
@@ -341,8 +338,7 @@ object CometConf extends ShimCometConf {
.doc(
"Fraction of Comet memory to be allocated per executor process for
Comet shuffle. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or "
+
- "calculated by `spark.comet.memory.overhead.factor` *
`spark.executor.memory`. " +
- "By default, this config is 1.0.")
+ "calculated by `spark.comet.memory.overhead.factor` *
`spark.executor.memory`.")
.doubleConf
.checkValue(
factor => factor > 0,
@@ -360,11 +356,12 @@ object CometConf extends ShimCometConf {
val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
- .doc("The ratio of total values to distinct values in a string column to
decide whether to " +
- "prefer dictionary encoding when shuffling the column. If the ratio is
higher than " +
- "this config, dictionary encoding will be used on shuffling string
column. This config " +
- "is effective if it is higher than 1.0. By default, this config is 10.0.
Note that this " +
- "config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
+ .doc(
+ "The ratio of total values to distinct values in a string column to
decide whether to " +
+ "prefer dictionary encoding when shuffling the column. If the ratio is
higher than " +
+ "this config, dictionary encoding will be used on shuffling string
column. This config " +
+ "is effective if it is higher than 1.0. Note that this " +
+ "config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
.doubleConf
.createWithDefault(10.0)
@@ -377,7 +374,7 @@ object CometConf extends ShimCometConf {
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.doc(
- "Whether to enable debug mode for Comet. By default, this config is
false. " +
+ "Whether to enable debug mode for Comet. " +
"When enabled, Comet will do additional checks for debugging
purpose. For example, " +
"validating array when importing arrays from JVM at native side.
Note that these " +
"checks may be expensive in performance and should only be enabled
for debugging " +
@@ -437,19 +434,19 @@ object CometConf extends ShimCometConf {
"The fraction of memory from Comet memory overhead that the native
memory " +
"manager can use for execution. The purpose of this config is to set
aside memory for " +
"untracked data structures, as well as imprecise size estimation
during memory " +
- "acquisition. Default value is 0.7.")
+ "acquisition.")
.doubleConf
.createWithDefault(0.7)
- val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf(
- "spark.comet.parquet.enable.directBuffer")
- .doc("Whether to use Java direct byte buffer when reading Parquet. By
default, this is false")
- .booleanConf
- .createWithDefault(false)
+ val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
+ conf("spark.comet.parquet.enable.directBuffer")
+ .doc("Whether to use Java direct byte buffer when reading Parquet.")
+ .booleanConf
+ .createWithDefault(false)
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
- .doc("Whether to enable pre-fetching feature of CometScan. By default is
disabled.")
+ .doc("Whether to enable pre-fetching feature of CometScan.")
.booleanConf
.createWithDefault(false)
@@ -457,7 +454,7 @@ object CometConf extends ShimCometConf {
conf("spark.comet.scan.preFetch.threadNum")
.doc(
"The number of threads running pre-fetching for CometScan. Effective
if " +
- s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. By default it is 2.
Note that more " +
+ s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
"pre-fetching threads means more memory requirement to store
pre-fetched row groups.")
.intConf
.createWithDefault(2)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index c1d63299..952ef39e 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1275,8 +1275,14 @@ object CometSparkSessionExtensions extends Logging {
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"),
ByteUnit.MiB)
val minimum = ConfigHelpers
- .byteFromString(sparkConf.get(COMET_MEMORY_OVERHEAD_MIN_MIB.key, "384"),
ByteUnit.MiB)
- val overheadFactor = sparkConf.getDouble(COMET_MEMORY_OVERHEAD_FACTOR.key,
0.2)
+ .byteFromString(
+ sparkConf.get(
+ COMET_MEMORY_OVERHEAD_MIN_MIB.key,
+ COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString),
+ ByteUnit.MiB)
+ val overheadFactor = sparkConf.getDouble(
+ COMET_MEMORY_OVERHEAD_FACTOR.key,
+ COMET_MEMORY_OVERHEAD_FACTOR.defaultValue.get)
val overHeadMemFromConf = sparkConf
.getOption(COMET_MEMORY_OVERHEAD.key)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]