This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 309264416 [CELEBORN-1095] Support configuration of fastest available 
XXHashFactory instance for checksum of Lz4Decompressor
309264416 is described below

commit 3092644168221ac66c827b0c2334bfd77013454a
Author: SteNicholas <[email protected]>
AuthorDate: Tue Oct 31 14:57:31 2023 +0800

    [CELEBORN-1095] Support configuration of fastest available XXHashFactory 
instance for checksum of Lz4Decompressor
    
    ### What changes were proposed in this pull request?
    
    `CelebornConf` adds 
`celeborn.client.shuffle.decompression.lz4.xxhash.instance` to configure 
fastest available `XXHashFactory` instance for checksum of `Lz4Decompressor`. 
Fix #2043.
    
    ### Why are the changes needed?
    
    `Lz4Decompressor` creates the checksum with 
`XXHashFactory#fastestInstance`, which returns the fastest available 
`XXHashFactory` instance that uses nativeInstance at default. The fastest 
available `XXHashFactory` instance for checksum of `Lz4Decompressor` could be 
supported to configure instead of dependency on the class loader is the system 
class loader, which method is as follows:
    ```
    /**
     * Returns the fastest available {link XXHashFactory} instance. If the class
     * loader is the system class loader and if the
     * {link #nativeInstance() native instance} loads successfully, then the
     * {link #nativeInstance() native instance} is returned, otherwise the
     * {link #fastestJavaInstance() fastest Java instance} is returned.
     * <p>
     * Please read {link #nativeInstance() javadocs of nativeInstance()} before
     * using this method.
     *
     * return the fastest available {link XXHashFactory} instance.
     */
    public static XXHashFactory fastestInstance() {
      if (Native.isLoaded()
          || Native.class.getClassLoader() == 
ClassLoader.getSystemClassLoader()) {
        try {
          return nativeInstance();
        } catch (Throwable t) {
          return fastestJavaInstance();
        }
      } else {
        return fastestJavaInstance();
      }
    }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - `CelebornConfSuite`
    - `ConfigurationSuite`
    
    Closes #2050 from SteNicholas/CELEBORN-1095.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: xiyu.zk <[email protected]>
---
 .../celeborn/client/compress/Decompressor.java     |  5 ++++-
 .../celeborn/client/compress/Lz4Decompressor.java  | 25 ++++++++++++++++++++--
 .../celeborn/client/compress/CodecSuiteJ.java      |  4 +++-
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 +++++++++++
 .../apache/celeborn/common/CelebornConfSuite.scala | 16 ++++++++++++++
 docs/configuration/client.md                       |  1 +
 6 files changed, 59 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java 
b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
index 12dbb4be1..2918fe1f2 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
@@ -17,6 +17,8 @@
 
 package org.apache.celeborn.client.compress;
 
+import scala.Option;
+
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.CompressionCodec;
 
@@ -37,7 +39,8 @@ public interface Decompressor {
     CompressionCodec codec = conf.shuffleCompressionCodec();
     switch (codec) {
       case LZ4:
-        return new Lz4Decompressor();
+        Option<String> xxHashInstance = 
conf.shuffleDecompressionLz4XXHashInstance();
+        return new Lz4Decompressor(xxHashInstance);
       case ZSTD:
         return new ZstdDecompressor();
       default:
diff --git 
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java 
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
index 8392db023..94569f800 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
@@ -17,8 +17,13 @@
 
 package org.apache.celeborn.client.compress;
 
+import java.util.Map;
+import java.util.function.Supplier;
 import java.util.zip.Checksum;
 
+import scala.Option;
+
+import com.google.common.collect.ImmutableMap;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4FastDecompressor;
 import net.jpountz.xxhash.XXHashFactory;
@@ -27,12 +32,22 @@ import org.slf4j.LoggerFactory;
 
 public class Lz4Decompressor extends Lz4Trait implements Decompressor {
   private static final Logger logger = 
LoggerFactory.getLogger(Lz4Decompressor.class);
+
   private final LZ4FastDecompressor decompressor;
   private final Checksum checksum;
 
-  public Lz4Decompressor() {
+  private final Map<String, Supplier<XXHashFactory>> xxHashFactories =
+      ImmutableMap.of(
+          "JNI",
+          XXHashFactory::nativeInstance,
+          "JAVASAFE",
+          XXHashFactory::safeInstance,
+          "JAVAUNSAFE",
+          XXHashFactory::unsafeInstance);
+
+  public Lz4Decompressor(Option<String> xxHashInstance) {
     decompressor = LZ4Factory.fastestInstance().fastDecompressor();
-    checksum = 
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum();
+    checksum = 
getXXHashFactory(xxHashInstance).newStreamingHash32(DEFAULT_SEED).asChecksum();
   }
 
   @Override
@@ -71,4 +86,10 @@ public class Lz4Decompressor extends Lz4Trait implements 
Decompressor {
 
     return originalLen;
   }
+
+  private XXHashFactory getXXHashFactory(Option<String> xxHashInstance) {
+    return xxHashInstance.isDefined() && 
xxHashFactories.containsKey(xxHashInstance.get())
+        ? xxHashFactories.get(xxHashInstance.get()).get()
+        : XXHashFactory.fastestInstance();
+  }
 }
diff --git 
a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java 
b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
index fe8c30024..53ba4ccd4 100644
--- a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
@@ -19,6 +19,8 @@ package org.apache.celeborn.client.compress;
 
 import java.nio.charset.StandardCharsets;
 
+import scala.Option;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,7 +37,7 @@ public class CodecSuiteJ {
     int oriLength = data.length;
     lz4Compressor.compress(data, 0, oriLength);
 
-    Lz4Decompressor lz4Decompressor = new Lz4Decompressor();
+    Lz4Decompressor lz4Decompressor = new Lz4Decompressor(Option.empty());
     byte[] dst = new byte[oriLength];
     int decompressLength = 
lz4Decompressor.decompress(lz4Compressor.getCompressedBuffer(), dst, 0);
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c46579875..306e8f29e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -724,6 +724,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   // //////////////////////////////////////////////////////
   def shuffleCompressionCodec: CompressionCodec =
     CompressionCodec.valueOf(get(SHUFFLE_COMPRESSION_CODEC))
+  def shuffleDecompressionLz4XXHashInstance: Option[String] =
+    get(SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE)
   def shuffleCompressionZstdCompressLevel: Int = 
get(SHUFFLE_COMPRESSION_ZSTD_LEVEL)
 
   // //////////////////////////////////////////////////////
@@ -3226,6 +3228,16 @@ object CelebornConf extends Logging {
         CompressionCodec.NONE.name))
       .createWithDefault(CompressionCodec.LZ4.name)
 
+  val SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE: OptionalConfigEntry[String] =
+    buildConf("celeborn.client.shuffle.decompression.lz4.xxhash.instance")
+      .categories("client")
+      .doc("Decompression XXHash instance for Lz4. Available options: JNI, 
JAVASAFE, JAVAUNSAFE.")
+      .version("0.3.2")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(Set("JNI", "JAVASAFE", "JAVAUNSAFE"))
+      .createOptional
+
   val SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
     buildConf("celeborn.client.shuffle.compression.zstd.level")
       .withAlternative("celeborn.shuffle.compression.zstd.level")
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 3e6cbc7ef..f57ce9040 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -118,6 +118,22 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(workerBaseDirs.head._3 == 9)
   }
 
+  test("CELEBORN-1095: Support configuration of fastest available 
XXHashFactory instance for checksum of Lz4Decompressor") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key, "JNI")
+    assert(conf.shuffleDecompressionLz4XXHashInstance.get == "JNI")
+    conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key, 
"JAVASAFE")
+    assert(conf.shuffleDecompressionLz4XXHashInstance.get == "JAVASAFE")
+    conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key, 
"JAVAUNSAFE")
+    assert(conf.shuffleDecompressionLz4XXHashInstance.get == "JAVAUNSAFE")
+    val error = intercept[IllegalArgumentException] {
+      conf.set(CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key, 
"NATIVE")
+      assert(conf.shuffleDecompressionLz4XXHashInstance.get == "NATIVE")
+    }.getMessage
+    assert(error.contains(
+      s"The value of 
${CelebornConf.SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE.key} should be one of 
JNI, JAVASAFE, JAVAUNSAFE, but was NATIVE"))
+  }
+
   test("zstd level") {
     val conf = new CelebornConf()
     val error1 = intercept[IllegalArgumentException] {
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 793794a4d..1ff362660 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -88,6 +88,7 @@ license: |
 | celeborn.client.shuffle.batchHandleReleasePartition.threads | 8 | Threads 
number for LifecycleManager to handle release partition request in batch. | 
0.3.0 | 
 | celeborn.client.shuffle.compression.codec | LZ4 | The codec used to compress 
shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, 
`none`. | 0.3.0 | 
 | celeborn.client.shuffle.compression.zstd.level | 1 | Compression level for 
Zstd compression codec, its value should be an integer between -5 and 22. 
Increasing the compression level will result in better compression at the 
expense of more CPU and memory. | 0.3.0 | 
+| celeborn.client.shuffle.decompression.lz4.xxhash.instance | 
&lt;undefined&gt; | Decompression XXHash instance for Lz4. Available options: 
JNI, JAVASAFE, JAVAUNSAFE. | 0.3.2 | 
 | celeborn.client.shuffle.expired.checkInterval | 60s | Interval for client to 
check expired shuffles. | 0.3.0 | 
 | celeborn.client.shuffle.manager.port | 0 | Port used by the LifecycleManager 
on the Driver. | 0.3.0 | 
 | celeborn.client.shuffle.mapPartition.split.enabled | false | whether to 
enable shuffle partition split. Currently, this only applies to MapPartition. | 
0.3.1 | 

Reply via email to