This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 23469fd93721 [SPARK-49459][CORE][SHUFFLE] Support `CRC32C` for Shuffle 
Checksum
23469fd93721 is described below

commit 23469fd937213941e56e696d9cbf485032560781
Author: Kent Yao <[email protected]>
AuthorDate: Fri Aug 30 09:06:14 2024 -0700

    [SPARK-49459][CORE][SHUFFLE] Support `CRC32C` for Shuffle Checksum
    
    ### What changes were proposed in this pull request?
    
    This PR adds (java.util.zip.)CRC32C to `spark.shuffle.checksum.algorithm`. 
CRC32C has been supported by JDK since 9.
    
    >  /*
         * This CRC-32C implementation uses the 'slicing-by-8' algorithm 
described
         * in the paper "A Systematic Approach to Building High Performance
         * Software-Based CRC Generators" by Michael E. Kounavis and Frank L. 
Berry,
         * Intel Research and Development
         */
    
    ### Why are the changes needed?
    
    CRC32C performs better on some SIMD CPU instruction sets
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, spark.shuffle.checksum.algorithm can be set to CRC32C.
    
    ### How was this patch tested?
    
    I tested this via benchmark
    
    - On GitHub Action Runner, [CRC32C is the 
best](https://github.com/apache/spark/pull/47929/files#diff-3e3d85e77ed82e6448cd3ce006346f9c947532ab2b9de12b7642e3bf54cf2d61R9-R12)
    
    - On my MAC M2, Adler32 is fastest
    ```
    
================================================================================================
    Benchmark Checksum Algorithms
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 17.0.12+0 on Mac OS X 14.6.1
    Apple M2 Max
    Checksum Algorithms:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    CRC32                                              4145           4190      
    46          0.0     4047834.9       1.0X
    CRC32C                                             4115           4155      
    35          0.0     4018904.7       1.0X
    Adler32                                            1961           1972      
    16          0.0     1914619.1       2.1X
    PureJavaCrc32C                                    18115          18322      
   245          0.0    17690350.5       0.2X
    
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #47929 from yaooqinn/crc32c.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../shuffle/checksum/ShuffleChecksumHelper.java    | 12 +++--
 .../benchmarks/ChecksumBenchmark-jdk21-results.txt | 14 +++++
 core/benchmarks/ChecksumBenchmark-results.txt      | 14 +++++
 .../org/apache/spark/internal/config/package.scala |  3 +-
 .../apache/spark/shuffle/ChecksumBenchmark.scala   | 59 ++++++++++++++++++++++
 docs/configuration.md                              |  2 +-
 6 files changed, 97 insertions(+), 7 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
index f9c0c60c2f2c..62fcda701d94 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
@@ -19,10 +19,7 @@ package org.apache.spark.network.shuffle.checksum;
 
 import java.io.*;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.Adler32;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import java.util.zip.*;
 
 import com.google.common.io.ByteStreams;
 
@@ -66,6 +63,13 @@ public class ShuffleChecksumHelper {
         }
       }
 
+      case "CRC32C"  -> {
+        checksums = new CRC32C[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32C();
+        }
+      }
+
       default -> throw new UnsupportedOperationException(
         "Unsupported shuffle checksum algorithm: " + algorithm);
     }
diff --git a/core/benchmarks/ChecksumBenchmark-jdk21-results.txt 
b/core/benchmarks/ChecksumBenchmark-jdk21-results.txt
new file mode 100644
index 000000000000..85370450f355
--- /dev/null
+++ b/core/benchmarks/ChecksumBenchmark-jdk21-results.txt
@@ -0,0 +1,14 @@
+================================================================================================
+Benchmark Checksum Algorithms
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.4+7-LTS on Linux 6.5.0-1025-azure
+AMD EPYC 7763 64-Core Processor
+Checksum Algorithms:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+CRC32                                              2743           2746         
  3          0.0     2678409.9       1.0X
+CRC32C                                             1974           2055         
 70          0.0     1928129.2       1.4X
+Adler32                                           12689          12709         
 17          0.0    12391425.9       0.2X
+hadoop PureJavaCrc32C                             23027          23041         
 13          0.0    22487098.9       0.1X
+
+
diff --git a/core/benchmarks/ChecksumBenchmark-results.txt 
b/core/benchmarks/ChecksumBenchmark-results.txt
new file mode 100644
index 000000000000..cce5a61abf63
--- /dev/null
+++ b/core/benchmarks/ChecksumBenchmark-results.txt
@@ -0,0 +1,14 @@
+================================================================================================
+Benchmark Checksum Algorithms
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.12+7-LTS on Linux 6.5.0-1025-azure
+AMD EPYC 7763 64-Core Processor
+Checksum Algorithms:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+CRC32                                              2757           2758         
  1          0.0     2692250.2       1.0X
+CRC32C                                             2142           2244         
116          0.0     2091901.8       1.3X
+Adler32                                           12699          12712         
 15          0.0    12401205.6       0.2X
+hadoop PureJavaCrc32C                             23049          23066         
 15          0.0    22508320.3       0.1X
+
+
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 8224bcac2830..0e19143411e9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1630,8 +1630,7 @@ package object config {
       .version("3.2.0")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
-      .checkValue(Set("ADLER32", "CRC32").contains, "Shuffle checksum 
algorithm " +
-        "should be either ADLER32 or CRC32.")
+      .checkValues(Set("ADLER32", "CRC32", "CRC32C"))
       .createWithDefault("ADLER32")
 
   private[spark] val SHUFFLE_COMPRESS =
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala
new file mode 100644
index 000000000000..16a50fabb7ff
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.util.zip.{Adler32, CRC32, CRC32C}
+
+import org.apache.hadoop.util.PureJavaCrc32C
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+
+/**
+ * Benchmark for Checksum Algorithms used by shuffle.
+ * {{{
+ *   To run this benchmark:
+ *   1. without sbt: bin/spark-submit --class <this class> <spark core test 
jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/ChecksumBenchmark-results.txt".
+ * }}}
+ */
+object ChecksumBenchmark extends BenchmarkBase {
+
+  val N = 1024
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Benchmark Checksum Algorithms") {
+      val data: Array[Byte] = (1 until 32 * 1024 * 1024).map(_.toByte).toArray
+      val benchmark = new Benchmark("Checksum Algorithms", N, 3, output = 
output)
+      benchmark.addCase("CRC32") { _ =>
+        (1 to N).foreach(_ => new CRC32().update(data))
+      }
+      benchmark.addCase(s"CRC32C") { _ =>
+        (1 to N).foreach(_ => new CRC32C().update(data))
+      }
+      benchmark.addCase(s"Adler32") { _ =>
+        (1 to N).foreach(_ => new Adler32().update(data))
+      }
+      benchmark.addCase(s"hadoop PureJavaCrc32C") { _ =>
+        (1 to N).foreach(_ => new PureJavaCrc32C().update(data))
+      }
+      benchmark.run()
+    }
+  }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 2da099a6c5ed..73d57b687ca2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1332,7 +1332,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.shuffle.checksum.algorithm</code></td>
   <td>ADLER32</td>
   <td>
-    The algorithm is used to calculate the shuffle checksum. Currently, it 
only supports built-in algorithms of JDK, e.g., ADLER32, CRC32.
+    The algorithm is used to calculate the shuffle checksum. Currently, it 
only supports built-in algorithms of JDK, e.g., ADLER32, CRC32 and CRC32C.
   </td>
   <td>3.2.0</td>
 </tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to