This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 9149cad [SPARK-32210][CORE] Fix NegativeArraySizeException in
MapOutputTracker with large spark.default.parallelism
9149cad is described below
commit 9149cad57d04f51e246f7a61cd62577cbec73190
Author: Kazuyuki Tanimura <[email protected]>
AuthorDate: Mon Aug 16 09:11:39 2021 -0700
[SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with
large spark.default.parallelism
### What changes were proposed in this pull request?
The current `MapOutputTracker` class may throw `NegativeArraySizeException`
with a large number of partitions. Within the serializeOutputStatuses() method,
it is trying to compress an array of mapStatuses and outputting the binary data
into (Apache)ByteArrayOutputStream . Inside the
(Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens
because the index is int and overflows (2GB limit) when the output binary size
is too large.
This PR proposes two high-level ideas:
1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which
has a way to output the underlying buffer as `Array[Array[Byte]]`.
2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in
order to handle over 2GB compressed data.
### Why are the changes needed?
This issue seems to be missed out in the earlier effort of addressing 2GB
limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)
Without this fix, `spark.default.parallelism` needs to be kept at the low
number. The drawback of setting smaller spark.default.parallelism is that it
requires more executor memory (more data per partition). Setting
`spark.io.compression.zstd.level` to higher number (default 1) hardly helps.
That essentially means we have the data size limit that for shuffling and
does not scale.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite -- -z
SPARK-32210"
```
Ran the benchmark using GitHub Actions and didn't not observe any
performance penalties. The results are attached in this PR
```
core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
```
Closes #33721 from kazuyukitanimura/SPARK-32210.
Authored-by: Kazuyuki Tanimura <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 8ee464cd7a09302cacc47a4cbc98fdf307f39dbd)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 ++++++++--------
.../MapStatusesSerDeserBenchmark-results.txt | 54 ++++++++--------
.../scala/org/apache/spark/MapOutputTracker.scala | 52 +++++++++-------
.../org/apache/spark/MapOutputTrackerSuite.scala | 72 +++++++++++++++++++++-
.../spark/MapStatusesSerDeserBenchmark.scala | 4 +-
5 files changed, 156 insertions(+), 80 deletions(-)
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
index 29699a2..0481630 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
@@ -1,64 +1,64 @@
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
-Serialization 179 194
9 1.1 897.4 1.0X
-Deserialization 254 321
74 0.8 1271.0 0.7X
+Serialization 148 164
8 1.4 739.6 1.0X
+Deserialization 202 303
72 1.0 1009.9 0.7X
-Compressed Serialized MapStatus sizes: 409 bytes
+Compressed Serialized MapStatus sizes: 412 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
-Serialization 160 166
7 1.2 801.2 1.0X
-Deserialization 256 323
69 0.8 1278.9 0.6X
+Serialization 125 132
9 1.6 623.4 1.0X
+Deserialization 197 277
76 1.0 984.4 0.6X
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
-Serialization 341 349
7 0.6 1707.3 1.0X
-Deserialization 286 370
84 0.7 1431.4 1.2X
+Serialization 260 286
17 0.8 1302.0 1.0X
+Deserialization 224 344
128 0.9 1121.0 1.2X
-Compressed Serialized MapStatus sizes: 426 bytes
+Compressed Serialized MapStatus sizes: 427 bytes
Compressed Serialized Broadcast MapStatus sizes: 13 MB
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-Serialization 309 319
11 0.6 1543.6 1.0X
-Deserialization 286 373
117 0.7 1429.5 1.1X
+Serialization 253 272
14 0.8 1262.9 1.0X
+Deserialization 240 409
150 0.8 1201.0 1.1X
Compressed Serialized MapStatus sizes: 13 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-Serialization 1619 1627
12 0.1 8092.6 1.0X
-Deserialization 864 883
26 0.2 4319.9 1.9X
+Serialization 1361 1378
24 0.1 6805.0 1.0X
+Deserialization 830 1022
272 0.2 4150.1 1.6X
-Compressed Serialized MapStatus sizes: 557 bytes
+Compressed Serialized MapStatus sizes: 562 bytes
Compressed Serialized Broadcast MapStatus sizes: 121 MB
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
-Serialization 1449 1456
9 0.1 7246.8 1.0X
-Deserialization 853 888
46 0.2 4263.7 1.7X
+Serialization 1216 1251
51 0.2 6078.3 1.0X
+Deserialization 821 968
138 0.2 4105.8 1.5X
Compressed Serialized MapStatus sizes: 121 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
index 96fa3a0..5b005a5 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
@@ -1,64 +1,64 @@
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
-Serialization 135 161
56 1.5 673.9 1.0X
-Deserialization 213 235
26 0.9 1065.6 0.6X
+Serialization 143 164
55 1.4 716.5 1.0X
+Deserialization 252 300
43 0.8 1262.4 0.6X
-Compressed Serialized MapStatus sizes: 409 bytes
+Compressed Serialized MapStatus sizes: 412 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
-Serialization 130 137
5 1.5 650.8 1.0X
-Deserialization 211 230
20 0.9 1056.5 0.6X
+Serialization 137 139
1 1.5 684.2 1.0X
+Deserialization 252 259
13 0.8 1259.5 0.5X
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
-Serialization 281 324
86 0.7 1406.7 1.0X
-Deserialization 240 267
32 0.8 1200.5 1.2X
+Serialization 279 322
116 0.7 1394.6 1.0X
+Deserialization 275 287
28 0.7 1372.7 1.0X
-Compressed Serialized MapStatus sizes: 426 bytes
+Compressed Serialized MapStatus sizes: 427 bytes
Compressed Serialized Broadcast MapStatus sizes: 13 MB
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-Serialization 265 273
6 0.8 1324.5 1.0X
-Deserialization 247 276
33 0.8 1236.1 1.1X
+Serialization 262 263
1 0.8 1310.3 1.0X
+Deserialization 274 288
22 0.7 1370.5 1.0X
Compressed Serialized MapStatus sizes: 13 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-Serialization 1333 1592
366 0.2 6666.0 1.0X
-Deserialization 560 585
22 0.4 2799.1 2.4X
+Serialization 1208 1208
1 0.2 6038.4 1.0X
+Deserialization 555 783
394 0.4 2774.2 2.2X
-Compressed Serialized MapStatus sizes: 558 bytes
+Compressed Serialized MapStatus sizes: 562 bytes
Compressed Serialized Broadcast MapStatus sizes: 121 MB
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
-Serialization 1222 1260
54 0.2 6111.7 1.0X
-Deserialization 539 568
42 0.4 2695.3 2.3X
+Serialization 1097 1097
1 0.2 5484.2 1.0X
+Deserialization 554 596
48 0.4 2771.3 2.0X
Compressed Serialized MapStatus sizes: 121 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b25ec5..24954e7 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -17,7 +17,8 @@
package org.apache.spark
-import java.io.{ByteArrayInputStream, IOException, ObjectInputStream,
ObjectOutputStream}
+import java.io.{ByteArrayInputStream, InputStream, IOException,
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue,
ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -40,6 +41,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus,
ShuffleOutputStatus}
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId,
ShuffleMergedBlockId}
import org.apache.spark.util._
+import org.apache.spark.util.io.{ChunkedByteBuffer,
ChunkedByteBufferOutputStream}
/**
* Helper class used by the [[MapOutputTrackerMaster]] to perform bookkeeping
for a single
@@ -121,14 +123,14 @@ private class ShuffleStatus(
* broadcast variable in order to keep it from being garbage collected and
to allow for it to be
* explicitly destroyed later on when the ShuffleMapStage is
garbage-collected.
*/
- private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
+ private[spark] var cachedSerializedBroadcast: Broadcast[Array[Array[Byte]]]
= _
/**
* Similar to cachedSerializedMapStatus and cachedSerializedBroadcast, but
for MergeStatus.
*/
private[this] var cachedSerializedMergeStatus: Array[Byte] = _
- private[this] var cachedSerializedBroadcastMergeStatus:
Broadcast[Array[Byte]] = _
+ private[this] var cachedSerializedBroadcastMergeStatus:
Broadcast[Array[Array[Byte]]] = _
/**
* Counter tracking the number of partitions that have output. This is a
performance optimization
@@ -1318,12 +1320,9 @@ private[spark] object MapOutputTracker extends Logging {
broadcastManager: BroadcastManager,
isLocal: Boolean,
minBroadcastSize: Int,
- conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = {
- // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of
the standard one
- // This implementation doesn't reallocate the whole memory block but
allocates
- // additional buffers. This way no buffers need to be garbage collected and
- // the contents don't have to be copied to the new buffer.
- val out = new ApacheByteArrayOutputStream()
+ conf: SparkConf): (Array[Byte], Broadcast[Array[Array[Byte]]]) = {
+ // ByteArrayOutputStream has the 2GB limit so use
ChunkedByteBufferOutputStream instead
+ val out = new ChunkedByteBufferOutputStream(1024 * 1024,
ByteBuffer.allocate)
out.write(DIRECT)
val codec = CompressionCodec.createCodec(conf,
conf.get(MAP_STATUS_COMPRESSION_CODEC))
val objOut = new ObjectOutputStream(codec.compressedOutputStream(out))
@@ -1335,13 +1334,19 @@ private[spark] object MapOutputTracker extends Logging {
} {
objOut.close()
}
- val arr = out.toByteArray
- if (arr.length >= minBroadcastSize) {
+ val chunkedByteBuf = out.toChunkedByteBuffer
+ val arrSize = out.size
+ if (arrSize >= minBroadcastSize) {
// Use broadcast instead.
// Important arr(0) is the tag == DIRECT, ignore that while
deserializing !
+ // arr is a nested Array so that it can handle over 2GB serialized data
+ val arr = chunkedByteBuf.getChunks().map(_.array())
val bcast = broadcastManager.newBroadcast(arr, isLocal)
- // toByteArray creates copy, so we can reuse out
- out.reset()
+ // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of
the standard one
+ // This implementation doesn't reallocate the whole memory block but
allocates
+ // additional buffers. This way no buffers need to be garbage collected
and
+ // the contents don't have to be copied to the new buffer.
+ val out = new ApacheByteArrayOutputStream()
out.write(BROADCAST)
val oos = new ObjectOutputStream(codec.compressedOutputStream(out))
Utils.tryWithSafeFinally {
@@ -1350,10 +1355,10 @@ private[spark] object MapOutputTracker extends Logging {
oos.close()
}
val outArr = out.toByteArray
- logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual
size = " + arr.length)
+ logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual
size = " + arrSize)
(outArr, bcast)
} else {
- (arr, null)
+ (chunkedByteBuf.toArray, null)
}
}
@@ -1362,13 +1367,12 @@ private[spark] object MapOutputTracker extends Logging {
bytes: Array[Byte], conf: SparkConf): Array[T] = {
assert (bytes.length > 0)
- def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
+ def deserializeObject(in: InputStream): AnyRef = {
val codec = CompressionCodec.createCodec(conf,
conf.get(MAP_STATUS_COMPRESSION_CODEC))
// The ZStd codec is wrapped in a `BufferedInputStream` which avoids
overhead excessive
// of JNI call while trying to decompress small amount of data for each
element
// of `MapStatuses`
- val objIn = new ObjectInputStream(codec.compressedInputStream(
- new ByteArrayInputStream(arr, off, len)))
+ val objIn = new ObjectInputStream(codec.compressedInputStream(in))
Utils.tryWithSafeFinally {
objIn.readObject()
} {
@@ -1376,18 +1380,20 @@ private[spark] object MapOutputTracker extends Logging {
}
}
+ val in = new ByteArrayInputStream(bytes, 1, bytes.length - 1)
bytes(0) match {
case DIRECT =>
- deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[T]]
+ deserializeObject(in).asInstanceOf[Array[T]]
case BROADCAST =>
try {
// deserialize the Broadcast, pull .value array out of it, and then
deserialize that
- val bcast = deserializeObject(bytes, 1, bytes.length - 1).
- asInstanceOf[Broadcast[Array[Byte]]]
+ val bcast =
deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]]
logInfo("Broadcast outputstatuses size = " + bytes.length +
- ", actual size = " + bcast.value.length)
+ ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length))
+ val bcastIn = new
ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream()
// Important - ignore the DIRECT tag ! Start from offset 1
- deserializeObject(bcast.value, 1, bcast.value.length -
1).asInstanceOf[Array[T]]
+ bcastIn.skip(1)
+ deserializeObject(bcastIn).asInstanceOf[Array[T]]
} catch {
case e: IOException =>
logWarning("Exception encountered during deserializing
broadcasted" +
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 69cc8c1..e81196f 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT,
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
-import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus, MergeStatus}
+import org.apache.spark.scheduler.{CompressedMapStatus,
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId,
ShuffleMergedBlockId}
@@ -664,4 +664,74 @@ class MapOutputTrackerSuite extends SparkFunSuite with
LocalSparkContext {
tracker.stop()
rpcEnv.shutdown()
}
+
+ test("SPARK-32210: serialize mapStatuses to a nested Array and deserialize
them") {
+ val newConf = new SparkConf
+
+ // needs TorrentBroadcast so need a SparkContext
+ withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) {
sc =>
+ val tracker =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val rpcEnv = sc.env.rpcEnv
+ val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker,
sc.getConf)
+ rpcEnv.stop(tracker.trackerEndpoint)
+ rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+ val shuffleId = 20
+ val numMaps = 1000
+
+ tracker.registerShuffle(shuffleId, numMaps,
MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
+ val r = new scala.util.Random(912)
+ (0 until numMaps).foreach { i =>
+ tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus(
+ BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000),
+ Array.fill[Long](1000)((r.nextDouble() * 1024 * 1024 *
1024).toLong), i))
+ }
+
+ val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
+ val (serializedMapStatus, serializedBroadcast) =
MapOutputTracker.serializeOutputStatuses(
+ shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal,
0, sc.getConf)
+ assert(serializedBroadcast.value.length > 1)
+ assert(serializedBroadcast.value.dropRight(1).forall(_.length == 1024 *
1024))
+
+ val result =
MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf)
+ assert(result.length == numMaps)
+
+ tracker.unregisterShuffle(shuffleId)
+ tracker.stop()
+ }
+ }
+
+ ignore("SPARK-32210: serialize and deserialize over 2GB compressed
mapStatuses") {
+ // This test requires 8GB heap memory settings
+ val newConf = new SparkConf
+
+ // needs TorrentBroadcast so need a SparkContext
+ withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) {
sc =>
+ val tracker =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val rpcEnv = sc.env.rpcEnv
+ val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker,
sc.getConf)
+ rpcEnv.stop(tracker.trackerEndpoint)
+ rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+ val shuffleId = 20
+ val numMaps = 200000
+
+ tracker.registerShuffle(shuffleId, numMaps,
MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
+ val r = new scala.util.Random(912)
+ (0 until numMaps).foreach { i =>
+ tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus(
+ BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000,
Some(r.nextString(1024 * 5))),
+ Array.fill(10)((r.nextDouble() * 1024 * 1024 * 1024).toLong), i))
+ }
+
+ val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
+ val (serializedMapStatus, serializedBroadcast) =
MapOutputTracker.serializeOutputStatuses(
+ shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal,
0, sc.getConf)
+ assert(serializedBroadcast.value.foldLeft(0L)(_ + _.length) > 2L * 1024
* 1024 * 1024)
+
+ val result =
MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf)
+ assert(result.length == numMaps)
+
+ tracker.unregisterShuffle(shuffleId)
+ tracker.stop()
+ }
+ }
}
diff --git
a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
index d808823..bb627bb 100644
--- a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
@@ -64,14 +64,14 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase {
val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
var serializedMapStatusSizes = 0
- var serializedBroadcastSizes = 0
+ var serializedBroadcastSizes = 0L
val (serializedMapStatus, serializedBroadcast) =
MapOutputTracker.serializeOutputStatuses(
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal,
minBroadcastSize,
sc.getConf)
serializedMapStatusSizes = serializedMapStatus.length
if (serializedBroadcast != null) {
- serializedBroadcastSizes = serializedBroadcast.value.length
+ serializedBroadcastSizes = serializedBroadcast.value.foldLeft(0L)(_ +
_.length)
}
benchmark.addCase("Serialization") { _ =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]