This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 309264416 [CELEBORN-1095] Support configuration of fastest available
XXHashFactory instance for checksum of Lz4Decompressor
309264416 is described below
commit 3092644168221ac66c827b0c2334bfd77013454a
Author: SteNicholas <[email protected]>
AuthorDate: Tue Oct 31 14:57:31 2023 +0800
[CELEBORN-1095] Support configuration of fastest available XXHashFactory
instance for checksum of Lz4Decompressor
### What changes were proposed in this pull request?
`CelebornConf` adds
`celeborn.client.shuffle.decompression.lz4.xxhash.instance` to configure
fastest available `XXHashFactory` instance for checksum of `Lz4Decompressor`.
Fix #2043.
### Why are the changes needed?
`Lz4Decompressor` creates the checksum with
`XXHashFactory#fastestInstance`, which returns the fastest available
`XXHashFactory` instance that uses nativeInstance at default. The fastest
available `XXHashFactory` instance for checksum of `Lz4Decompressor` could be
supported to configure instead of dependency on the class loader is the system
class loader, which method is as follows:
```
/**
* Returns the fastest available {link XXHashFactory} instance. If the class
* loader is the system class loader and if the
* {link #nativeInstance() native instance} loads successfully, then the
* {link #nativeInstance() native instance} is returned, otherwise the
* {link #fastestJavaInstance() fastest Java instance} is returned.
* <p>
* Please read {link #nativeInstance() javadocs of nativeInstance()} before
* using this method.
*
* return the fastest available {link XXHashFactory} instance.
*/
public static XXHashFactory fastestInstance() {
if (Native.isLoaded()
|| Native.class.getClassLoader() ==
ClassLoader.getSystemClassLoader()) {
try {
return nativeInstance();
} catch (Throwable t) {
return fastestJavaInstance();
}
} else {
return fastestJavaInstance();
}
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `CelebornConfSuite`
- `ConfigurationSuite`
Closes #2050 from SteNicholas/CELEBORN-1095.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: xiyu.zk <[email protected]>
---
.../celeborn/client/compress/Decompressor.java | 5 ++++-
.../celeborn/client/compress/Lz4Decompressor.java | 25 ++++++++++++++++++++--
.../celeborn/client/compress/CodecSuiteJ.java | 4 +++-
.../org/apache/celeborn/common/CelebornConf.scala | 12 +++++++++++
.../apache/celeborn/common/CelebornConfSuite.scala | 16 ++++++++++++++
docs/configuration/client.md | 1 +
6 files changed, 59 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
index 12dbb4be1..2918fe1f2 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
@@ -17,6 +17,8 @@
package org.apache.celeborn.client.compress;
+import scala.Option;
+
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
@@ -37,7 +39,8 @@ public interface Decompressor {
CompressionCodec codec = conf.shuffleCompressionCodec();
switch (codec) {
case LZ4:
- return new Lz4Decompressor();
+ Option<String> xxHashInstance =
conf.shuffleDecompressionLz4XXHashInstance();
+ return new Lz4Decompressor(xxHashInstance);
case ZSTD:
return new ZstdDecompressor();
default:
diff --git
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
index 8392db023..94569f800 100644
---
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
+++
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
@@ -17,8 +17,13 @@
package org.apache.celeborn.client.compress;
+import java.util.Map;
+import java.util.function.Supplier;
import java.util.zip.Checksum;
+import scala.Option;
+
+import com.google.common.collect.ImmutableMap;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.xxhash.XXHashFactory;
@@ -27,12 +32,22 @@ import org.slf4j.LoggerFactory;
public class Lz4Decompressor extends Lz4Trait implements Decompressor {
private static final Logger logger =
LoggerFactory.getLogger(Lz4Decompressor.class);
+
private final LZ4FastDecompressor decompressor;
private final Checksum checksum;
- public Lz4Decompressor() {
+ private final Map<String, Supplier<XXHashFactory>> xxHashFactories =
+ ImmutableMap.of(
+ "JNI",
+ XXHashFactory::nativeInstance,
+ "JAVASAFE",
+ XXHashFactory::safeInstance,
+ "JAVAUNSAFE",
+ XXHashFactory::unsafeInstance);
+
+ public Lz4Decompressor(Option<String> xxHashInstance) {
decompressor = LZ4Factory.fastestInstance().fastDecompressor();
- checksum =
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum();
+ checksum =
getXXHashFactory(xxHashInstance).newStreamingHash32(DEFAULT_SEED).asChecksum();
}
@Override
@@ -71,4 +86,10 @@ public class Lz4Decompressor extends Lz4Trait implements
Decompressor {
return originalLen;
}
+
+ private XXHashFactory getXXHashFactory(Option<String> xxHashInstance) {
+ return xxHashInstance.isDefined() &&
xxHashFactories.containsKey(xxHashInstance.get())
+ ? xxHashFactories.get(xxHashInstance.get()).get()
+ : XXHashFactory.fastestInstance();
+ }
}
diff --git
a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
index fe8c30024..53ba4ccd4 100644
--- a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
@@ -19,6 +19,8 @@ package org.apache.celeborn.client.compress;
import java.nio.charset.StandardCharsets;
+import scala.Option;
+
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -35,7 +37,7 @@ public class CodecSuiteJ {
int oriLength = data.length;
lz4Compressor.compress(data, 0, oriLength);
- Lz4Decompressor lz4Decompressor = new Lz4Decompressor();
+ Lz4Decompressor lz4Decompressor = new Lz4Decompressor(Option.empty());
byte[] dst = new byte[oriLength];
int decompressLength =
lz4Decompressor.decompress(lz4Compressor.getCompressedBuffer(), dst, 0);
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c46579875..306e8f29e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -724,6 +724,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// //////////////////////////////////////////////////////
def shuffleCompressionCodec: CompressionCodec =
CompressionCodec.valueOf(get(SHUFFLE_COMPRESSION_CODEC))
+ def shuffleDecompressionLz4XXHashInstance: Option[String] =
+ get(SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE)
def shuffleCompressionZstdCompressLevel: Int =
get(SHUFFLE_COMPRESSION_ZSTD_LEVEL)
// //////////////////////////////////////////////////////
@@ -3226,6 +3228,16 @@ object CelebornConf extends Logging {
CompressionCodec.NONE.name))
.createWithDefault(CompressionCodec.LZ4.name)
+ val SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE: OptionalConfigEntry[String] =
+ buildConf("celeborn.client.shuffle.decompression.lz4.xxhash.instance")
+ .categories("client")
+ .doc("Decompression XXHash instance for Lz4. Available options: JNI,
JAVASAFE, JAVAUNSAFE.")
+ .version("0.3.2")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set("JNI", "JAVASAFE", "JAVAUNSAFE"))
+ .createOptional
+
val SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.compression.zstd.level")
.withAlternative("celeborn.shuffle.compression.zstd.level")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 3e6cbc7ef..f57ce9040 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -118,6 +118,22 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(workerBaseDirs.head._3 == 9)
}
+ test("CELEBORN-1095: Support configuration of fastest available
XXHashFactory instance for checksum of Lz4Decompressor") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key, "JNI")
+ assert(conf.shuffleDecompressionLz4XXHashInstance.get == "JNI")
+ conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key,
"JAVASAFE")
+ assert(conf.shuffleDecompressionLz4XXHashInstance.get == "JAVASAFE")
+ conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key,
"JAVAUNSAFE")
+ assert(conf.shuffleDecompressionLz4XXHashInstance.get == "JAVAUNSAFE")
+ val error = intercept[IllegalArgumentException] {
+ conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key,
"NATIVE")
+ assert(conf.shuffleDecompressionLz4XXHashInstance.get == "NATIVE")
+ }.getMessage
+ assert(error.contains(
+ s"The value of
${CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key} should be one of
JNI, JAVASAFE, JAVAUNSAFE, but was NATIVE"))
+ }
+
test("zstd level") {
val conf = new CelebornConf()
val error1 = intercept[IllegalArgumentException] {
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 793794a4d..1ff362660 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -88,6 +88,7 @@ license: |
| celeborn.client.shuffle.batchHandleReleasePartition.threads | 8 | Threads
number for LifecycleManager to handle release partition request in batch. |
0.3.0 |
| celeborn.client.shuffle.compression.codec | LZ4 | The codec used to compress
shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`,
`none`. | 0.3.0 |
| celeborn.client.shuffle.compression.zstd.level | 1 | Compression level for
Zstd compression codec, its value should be an integer between -5 and 22.
Increasing the compression level will result in better compression at the
expense of more CPU and memory. | 0.3.0 |
+| celeborn.client.shuffle.decompression.lz4.xxhash.instance |
<undefined> | Decompression XXHash instance for Lz4. Available options:
JNI, JAVASAFE, JAVAUNSAFE. | 0.3.2 |
| celeborn.client.shuffle.expired.checkInterval | 60s | Interval for client to
check expired shuffles. | 0.3.0 |
| celeborn.client.shuffle.manager.port | 0 | Port used by the LifecycleManager
on the Driver. | 0.3.0 |
| celeborn.client.shuffle.mapPartition.split.enabled | false | whether to
enable shuffle partition split. Currently, this only applies to MapPartition. |
0.3.1 |