kazuyukitanimura commented on code in PR #1185: URL: https://github.com/apache/datafusion-comet/pull/1185#discussion_r1893200920
########## native/core/src/execution/shuffle/row.rs: ########## @@ -3358,7 +3359,9 @@ pub fn process_sorted_row_partition( // we do not collect metrics in Native_writeSortedFileNative let ipc_time = Time::default(); - written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?; + // compression codec is not configurable for CometBypassMergeSortShuffleWriter Review Comment: Did you mean TODO by this comment? ########## native/proto/src/proto/operator.proto: ########## @@ -82,10 +82,18 @@ message Limit { int32 offset = 2; } +enum CompressionCodec { + None = 0; + Zstd = 1; + Lz4 = 2; Review Comment: Lz4 is for the future PR? ########## common/src/main/scala/org/apache/comet/CometConf.scala: ########## @@ -272,13 +272,21 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf( - s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec") + val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf( + s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") .doc( - "The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.") + "The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " + + "Compression can be disabled by setting spark.shuffle.compress=false.") .stringConf + .checkValues(Set("zstd")) .createWithDefault("zstd") + val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level") + .doc("The compression level to use when compression shuffle files.") + .intConf + .createWithDefault(1) Review Comment: Is default level (previous behavior) 3? ########## docs/source/user-guide/tuning.md: ########## @@ -103,6 +103,12 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. +### Shuffle Compression + +By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression. +Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in +certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage. Review Comment: Should we change comet default to LZ4 if Spark default is LZ4 (can be separate PR)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org