This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 23469fd93721 [SPARK-49459][CORE][SHUFFLE] Support `CRC32C` for Shuffle
Checksum
23469fd93721 is described below
commit 23469fd937213941e56e696d9cbf485032560781
Author: Kent Yao <[email protected]>
AuthorDate: Fri Aug 30 09:06:14 2024 -0700
[SPARK-49459][CORE][SHUFFLE] Support `CRC32C` for Shuffle Checksum
### What changes were proposed in this pull request?
This PR adds (java.util.zip.)CRC32C to `spark.shuffle.checksum.algorithm`.
CRC32C has been supported by JDK since 9.
> /*
* This CRC-32C implementation uses the 'slicing-by-8' algorithm
described
* in the paper "A Systematic Approach to Building High Performance
* Software-Based CRC Generators" by Michael E. Kounavis and Frank L.
Berry,
* Intel Research and Development
*/
### Why are the changes needed?
CRC32C performs better on some SIMD CPU instruction sets
### Does this PR introduce _any_ user-facing change?
Yes, spark.shuffle.checksum.algorithm can be set to CRC32C.
### How was this patch tested?
I tested this via benchmark
- On GitHub Action Runner, [CRC32C is the
best](https://github.com/apache/spark/pull/47929/files#diff-3e3d85e77ed82e6448cd3ce006346f9c947532ab2b9de12b7642e3bf54cf2d61R9-R12)
- On my MAC M2, Adler32 is fastest
```
================================================================================================
Benchmark Checksum Algorithms
================================================================================================
OpenJDK 64-Bit Server VM 17.0.12+0 on Mac OS X 14.6.1
Apple M2 Max
Checksum Algorithms: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
CRC32 4145 4190
46 0.0 4047834.9 1.0X
CRC32C 4115 4155
35 0.0 4018904.7 1.0X
Adler32 1961 1972
16 0.0 1914619.1 2.1X
PureJavaCrc32C 18115 18322
245 0.0 17690350.5 0.2X
```
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #47929 from yaooqinn/crc32c.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../shuffle/checksum/ShuffleChecksumHelper.java | 12 +++--
.../benchmarks/ChecksumBenchmark-jdk21-results.txt | 14 +++++
core/benchmarks/ChecksumBenchmark-results.txt | 14 +++++
.../org/apache/spark/internal/config/package.scala | 3 +-
.../apache/spark/shuffle/ChecksumBenchmark.scala | 59 ++++++++++++++++++++++
docs/configuration.md | 2 +-
6 files changed, 97 insertions(+), 7 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
index f9c0c60c2f2c..62fcda701d94 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
@@ -19,10 +19,7 @@ package org.apache.spark.network.shuffle.checksum;
import java.io.*;
import java.util.concurrent.TimeUnit;
-import java.util.zip.Adler32;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import java.util.zip.*;
import com.google.common.io.ByteStreams;
@@ -66,6 +63,13 @@ public class ShuffleChecksumHelper {
}
}
+ case "CRC32C" -> {
+ checksums = new CRC32C[num];
+ for (int i = 0; i < num; i++) {
+ checksums[i] = new CRC32C();
+ }
+ }
+
default -> throw new UnsupportedOperationException(
"Unsupported shuffle checksum algorithm: " + algorithm);
}
diff --git a/core/benchmarks/ChecksumBenchmark-jdk21-results.txt
b/core/benchmarks/ChecksumBenchmark-jdk21-results.txt
new file mode 100644
index 000000000000..85370450f355
--- /dev/null
+++ b/core/benchmarks/ChecksumBenchmark-jdk21-results.txt
@@ -0,0 +1,14 @@
+================================================================================================
+Benchmark Checksum Algorithms
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.4+7-LTS on Linux 6.5.0-1025-azure
+AMD EPYC 7763 64-Core Processor
+Checksum Algorithms: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+CRC32 2743 2746
3 0.0 2678409.9 1.0X
+CRC32C 1974 2055
70 0.0 1928129.2 1.4X
+Adler32 12689 12709
17 0.0 12391425.9 0.2X
+hadoop PureJavaCrc32C 23027 23041
13 0.0 22487098.9 0.1X
+
+
diff --git a/core/benchmarks/ChecksumBenchmark-results.txt
b/core/benchmarks/ChecksumBenchmark-results.txt
new file mode 100644
index 000000000000..cce5a61abf63
--- /dev/null
+++ b/core/benchmarks/ChecksumBenchmark-results.txt
@@ -0,0 +1,14 @@
+================================================================================================
+Benchmark Checksum Algorithms
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.12+7-LTS on Linux 6.5.0-1025-azure
+AMD EPYC 7763 64-Core Processor
+Checksum Algorithms: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+CRC32 2757 2758
1 0.0 2692250.2 1.0X
+CRC32C 2142 2244
116 0.0 2091901.8 1.3X
+Adler32 12699 12712
15 0.0 12401205.6 0.2X
+hadoop PureJavaCrc32C 23049 23066
15 0.0 22508320.3 0.1X
+
+
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 8224bcac2830..0e19143411e9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1630,8 +1630,7 @@ package object config {
.version("3.2.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
- .checkValue(Set("ADLER32", "CRC32").contains, "Shuffle checksum
algorithm " +
- "should be either ADLER32 or CRC32.")
+ .checkValues(Set("ADLER32", "CRC32", "CRC32C"))
.createWithDefault("ADLER32")
private[spark] val SHUFFLE_COMPRESS =
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala
b/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala
new file mode 100644
index 000000000000..16a50fabb7ff
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.util.zip.{Adler32, CRC32, CRC32C}
+
+import org.apache.hadoop.util.PureJavaCrc32C
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+
+/**
+ * Benchmark for Checksum Algorithms used by shuffle.
+ * {{{
+ * To run this benchmark:
+ * 1. without sbt: bin/spark-submit --class <this class> <spark core test
jar>
+ * 2. build/sbt "core/Test/runMain <this class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt
"core/Test/runMain <this class>"
+ * Results will be written to "benchmarks/ChecksumBenchmark-results.txt".
+ * }}}
+ */
+object ChecksumBenchmark extends BenchmarkBase {
+
+ val N = 1024
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("Benchmark Checksum Algorithms") {
+ val data: Array[Byte] = (1 until 32 * 1024 * 1024).map(_.toByte).toArray
+ val benchmark = new Benchmark("Checksum Algorithms", N, 3, output =
output)
+ benchmark.addCase("CRC32") { _ =>
+ (1 to N).foreach(_ => new CRC32().update(data))
+ }
+ benchmark.addCase(s"CRC32C") { _ =>
+ (1 to N).foreach(_ => new CRC32C().update(data))
+ }
+ benchmark.addCase(s"Adler32") { _ =>
+ (1 to N).foreach(_ => new Adler32().update(data))
+ }
+ benchmark.addCase(s"hadoop PureJavaCrc32C") { _ =>
+ (1 to N).foreach(_ => new PureJavaCrc32C().update(data))
+ }
+ benchmark.run()
+ }
+ }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 2da099a6c5ed..73d57b687ca2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1332,7 +1332,7 @@ Apart from these, the following properties are also
available, and may be useful
<td><code>spark.shuffle.checksum.algorithm</code></td>
<td>ADLER32</td>
<td>
- The algorithm is used to calculate the shuffle checksum. Currently, it
only supports built-in algorithms of JDK, e.g., ADLER32, CRC32.
+ The algorithm is used to calculate the shuffle checksum. Currently, it
only supports built-in algorithms of JDK, e.g., ADLER32, CRC32 and CRC32C.
</td>
<td>3.2.0</td>
</tr>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]