kazuyukitanimura commented on code in PR #1185:
URL: https://github.com/apache/datafusion-comet/pull/1185#discussion_r1893200920


##########
native/core/src/execution/shuffle/row.rs:
##########
@@ -3358,7 +3359,9 @@ pub fn process_sorted_row_partition(
 
         // we do not collect metrics in Native_writeSortedFileNative
         let ipc_time = Time::default();
-        written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;
+        // compression codec is not configurable for 
CometBypassMergeSortShuffleWriter

Review Comment:
   Did you mean TODO by this comment?



##########
native/proto/src/proto/operator.proto:
##########
@@ -82,10 +82,18 @@ message Limit {
   int32 offset = 2;
 }
 
+enum CompressionCodec {
+  None = 0;
+  Zstd = 1;
+  Lz4 = 2;

Review Comment:
   Lz4 is for the future PR?



##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -272,13 +272,21 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
-  val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
-    s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
+  val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
+    s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
     .doc(
-      "The codec of Comet native shuffle used to compress shuffle data. Only 
zstd is supported.")
+      "The codec of Comet native shuffle used to compress shuffle data. Only 
zstd is supported. " +
+        "Compression can be disabled by setting spark.shuffle.compress=false.")
     .stringConf
+    .checkValues(Set("zstd"))
     .createWithDefault("zstd")
 
+  val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
+    conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
+      .doc("The compression level to use when compression shuffle files.")
+      .intConf
+      .createWithDefault(1)

Review Comment:
   Is default level (previous behavior) 3?



##########
docs/source/user-guide/tuning.md:
##########
@@ -103,6 +103,12 @@ native shuffle currently only supports `HashPartitioning` 
and `SinglePartitionin
 To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If 
this mode is explicitly set,
 then any shuffle operations that cannot be supported in this mode will fall 
back to Spark.
 
+### Shuffle Compression
+
+By default, Spark compresses shuffle files using LZ4 compression. Comet 
overrides this behavior with ZSTD compression.
+Compression can be disabled by setting `spark.shuffle.compress=false`, which 
may result in faster shuffle times in 
+certain environments, such as single-node setups with fast NVMe drives, at the 
expense of increased disk space usage.

Review Comment:
   Should we change comet default to LZ4 if Spark default is LZ4 (can be 
separate PR)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to