This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 9149cad  [SPARK-32210][CORE] Fix NegativeArraySizeException in 
MapOutputTracker with large spark.default.parallelism
9149cad is described below

commit 9149cad57d04f51e246f7a61cd62577cbec73190
Author: Kazuyuki Tanimura <ktanim...@apple.com>
AuthorDate: Mon Aug 16 09:11:39 2021 -0700

    [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with 
large spark.default.parallelism
    
    ### What changes were proposed in this pull request?
    The current `MapOutputTracker` class may throw `NegativeArraySizeException` 
with a large number of partitions. Within the serializeOutputStatuses() method, 
it is trying to compress an array of mapStatuses and outputting the binary data 
into (Apache)ByteArrayOutputStream . Inside the 
(Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens 
because the index is int and overflows (2GB limit) when the output binary size 
is too large.
    
    This PR proposes two high-level ideas:
      1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which 
has a way to output the underlying buffer as `Array[Array[Byte]]`.
      2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in 
order to handle over 2GB compressed data.
    
    ### Why are the changes needed?
    This issue seems to be missed out in the earlier effort of addressing 2GB 
limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)
    
    Without this fix, `spark.default.parallelism` needs to be kept at the low 
number. The drawback of setting smaller spark.default.parallelism is that it 
requires more executor memory (more data per partition). Setting 
`spark.io.compression.zstd.level` to higher number (default 1) hardly helps.
    
    That essentially means we have the data size limit that for shuffling and 
does not scale.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passed existing tests
    ```
    build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite"
    ```
    Also added a new unit test
    ```
    build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite  -- -z 
SPARK-32210"
    ```
    Ran the benchmark using GitHub Actions and didn't not observe any 
performance penalties. The results are attached in this PR
    ```
    core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
    core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
    ```
    
    Closes #33721 from kazuyukitanimura/SPARK-32210.
    
    Authored-by: Kazuyuki Tanimura <ktanim...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 8ee464cd7a09302cacc47a4cbc98fdf307f39dbd)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 ++++++++--------
 .../MapStatusesSerDeserBenchmark-results.txt       | 54 ++++++++--------
 .../scala/org/apache/spark/MapOutputTracker.scala  | 52 +++++++++-------
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 72 +++++++++++++++++++++-
 .../spark/MapStatusesSerDeserBenchmark.scala       |  4 +-
 5 files changed, 156 insertions(+), 80 deletions(-)

diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt 
b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
index 29699a2..0481630 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
@@ -1,64 +1,64 @@
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 200000 MapOutputs, 10 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------
-Serialization                                        179            194        
   9          1.1         897.4       1.0X
-Deserialization                                      254            321        
  74          0.8        1271.0       0.7X
+Serialization                                        148            164        
   8          1.4         739.6       1.0X
+Deserialization                                      202            303        
  72          1.0        1009.9       0.7X
 
-Compressed Serialized MapStatus sizes: 409 bytes
+Compressed Serialized MapStatus sizes: 412 bytes
 Compressed Serialized Broadcast MapStatus sizes: 2 MB
 
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 200000 MapOutputs, 10 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------
-Serialization                                         160            166       
    7          1.2         801.2       1.0X
-Deserialization                                       256            323       
   69          0.8        1278.9       0.6X
+Serialization                                         125            132       
    9          1.6         623.4       1.0X
+Deserialization                                       197            277       
   76          1.0         984.4       0.6X
 
 Compressed Serialized MapStatus sizes: 2 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 200000 MapOutputs, 100 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------
-Serialization                                         341            349       
    7          0.6        1707.3       1.0X
-Deserialization                                       286            370       
   84          0.7        1431.4       1.2X
+Serialization                                         260            286       
   17          0.8        1302.0       1.0X
+Deserialization                                       224            344       
  128          0.9        1121.0       1.2X
 
-Compressed Serialized MapStatus sizes: 426 bytes
+Compressed Serialized MapStatus sizes: 427 bytes
 Compressed Serialized Broadcast MapStatus sizes: 13 MB
 
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 200000 MapOutputs, 100 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-Serialization                                          309            319      
    11          0.6        1543.6       1.0X
-Deserialization                                        286            373      
   117          0.7        1429.5       1.1X
+Serialization                                          253            272      
    14          0.8        1262.9       1.0X
+Deserialization                                        240            409      
   150          0.8        1201.0       1.1X
 
 Compressed Serialized MapStatus sizes: 13 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 200000 MapOutputs, 1000 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-Serialization                                         1619           1627      
    12          0.1        8092.6       1.0X
-Deserialization                                        864            883      
    26          0.2        4319.9       1.9X
+Serialization                                         1361           1378      
    24          0.1        6805.0       1.0X
+Deserialization                                        830           1022      
   272          0.2        4150.1       1.6X
 
-Compressed Serialized MapStatus sizes: 557 bytes
+Compressed Serialized MapStatus sizes: 562 bytes
 Compressed Serialized Broadcast MapStatus sizes: 121 MB
 
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 200000 MapOutputs, 1000 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
----------------------------------------------------------------------------------------------------------------------------
-Serialization                                          1449           1456     
      9          0.1        7246.8       1.0X
-Deserialization                                         853            888     
     46          0.2        4263.7       1.7X
+Serialization                                          1216           1251     
     51          0.2        6078.3       1.0X
+Deserialization                                         821            968     
    138          0.2        4105.8       1.5X
 
 Compressed Serialized MapStatus sizes: 121 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt 
b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
index 96fa3a0..5b005a5 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
@@ -1,64 +1,64 @@
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 200000 MapOutputs, 10 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------
-Serialization                                        135            161        
  56          1.5         673.9       1.0X
-Deserialization                                      213            235        
  26          0.9        1065.6       0.6X
+Serialization                                        143            164        
  55          1.4         716.5       1.0X
+Deserialization                                      252            300        
  43          0.8        1262.4       0.6X
 
-Compressed Serialized MapStatus sizes: 409 bytes
+Compressed Serialized MapStatus sizes: 412 bytes
 Compressed Serialized Broadcast MapStatus sizes: 2 MB
 
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 200000 MapOutputs, 10 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------
-Serialization                                         130            137       
    5          1.5         650.8       1.0X
-Deserialization                                       211            230       
   20          0.9        1056.5       0.6X
+Serialization                                         137            139       
    1          1.5         684.2       1.0X
+Deserialization                                       252            259       
   13          0.8        1259.5       0.5X
 
 Compressed Serialized MapStatus sizes: 2 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 200000 MapOutputs, 100 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------
-Serialization                                         281            324       
   86          0.7        1406.7       1.0X
-Deserialization                                       240            267       
   32          0.8        1200.5       1.2X
+Serialization                                         279            322       
  116          0.7        1394.6       1.0X
+Deserialization                                       275            287       
   28          0.7        1372.7       1.0X
 
-Compressed Serialized MapStatus sizes: 426 bytes
+Compressed Serialized MapStatus sizes: 427 bytes
 Compressed Serialized Broadcast MapStatus sizes: 13 MB
 
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 200000 MapOutputs, 100 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-Serialization                                          265            273      
     6          0.8        1324.5       1.0X
-Deserialization                                        247            276      
    33          0.8        1236.1       1.1X
+Serialization                                          262            263      
     1          0.8        1310.3       1.0X
+Deserialization                                        274            288      
    22          0.7        1370.5       1.0X
 
 Compressed Serialized MapStatus sizes: 13 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 200000 MapOutputs, 1000 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-Serialization                                         1333           1592      
   366          0.2        6666.0       1.0X
-Deserialization                                        560            585      
    22          0.4        2799.1       2.4X
+Serialization                                         1208           1208      
     1          0.2        6038.4       1.0X
+Deserialization                                        555            783      
   394          0.4        2774.2       2.2X
 
-Compressed Serialized MapStatus sizes: 558 bytes
+Compressed Serialized MapStatus sizes: 562 bytes
 Compressed Serialized Broadcast MapStatus sizes: 121 MB
 
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 200000 MapOutputs, 1000 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
----------------------------------------------------------------------------------------------------------------------------
-Serialization                                          1222           1260     
     54          0.2        6111.7       1.0X
-Deserialization                                         539            568     
     42          0.4        2695.3       2.3X
+Serialization                                          1097           1097     
      1          0.2        5484.2       1.0X
+Deserialization                                         554            596     
     48          0.4        2771.3       2.0X
 
 Compressed Serialized MapStatus sizes: 121 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b25ec5..24954e7 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark
 
-import java.io.{ByteArrayInputStream, IOException, ObjectInputStream, 
ObjectOutputStream}
+import java.io.{ByteArrayInputStream, InputStream, IOException, 
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, 
ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -40,6 +41,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, 
ShuffleOutputStatus}
 import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
 import org.apache.spark.util._
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 /**
  * Helper class used by the [[MapOutputTrackerMaster]] to perform bookkeeping 
for a single
@@ -121,14 +123,14 @@ private class ShuffleStatus(
    * broadcast variable in order to keep it from being garbage collected and 
to allow for it to be
    * explicitly destroyed later on when the ShuffleMapStage is 
garbage-collected.
    */
-  private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
+  private[spark] var cachedSerializedBroadcast: Broadcast[Array[Array[Byte]]] 
= _
 
   /**
    * Similar to cachedSerializedMapStatus and cachedSerializedBroadcast, but 
for MergeStatus.
    */
   private[this] var cachedSerializedMergeStatus: Array[Byte] = _
 
-  private[this] var cachedSerializedBroadcastMergeStatus: 
Broadcast[Array[Byte]] = _
+  private[this] var cachedSerializedBroadcastMergeStatus: 
Broadcast[Array[Array[Byte]]] = _
 
   /**
    * Counter tracking the number of partitions that have output. This is a 
performance optimization
@@ -1318,12 +1320,9 @@ private[spark] object MapOutputTracker extends Logging {
       broadcastManager: BroadcastManager,
       isLocal: Boolean,
       minBroadcastSize: Int,
-      conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = {
-    // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of 
the standard one
-    // This implementation doesn't reallocate the whole memory block but 
allocates
-    // additional buffers. This way no buffers need to be garbage collected and
-    // the contents don't have to be copied to the new buffer.
-    val out = new ApacheByteArrayOutputStream()
+      conf: SparkConf): (Array[Byte], Broadcast[Array[Array[Byte]]]) = {
+    // ByteArrayOutputStream has the 2GB limit so use 
ChunkedByteBufferOutputStream instead
+    val out = new ChunkedByteBufferOutputStream(1024 * 1024, 
ByteBuffer.allocate)
     out.write(DIRECT)
     val codec = CompressionCodec.createCodec(conf, 
conf.get(MAP_STATUS_COMPRESSION_CODEC))
     val objOut = new ObjectOutputStream(codec.compressedOutputStream(out))
@@ -1335,13 +1334,19 @@ private[spark] object MapOutputTracker extends Logging {
     } {
       objOut.close()
     }
-    val arr = out.toByteArray
-    if (arr.length >= minBroadcastSize) {
+    val chunkedByteBuf = out.toChunkedByteBuffer
+    val arrSize = out.size
+    if (arrSize >= minBroadcastSize) {
       // Use broadcast instead.
       // Important arr(0) is the tag == DIRECT, ignore that while 
deserializing !
+      // arr is a nested Array so that it can handle over 2GB serialized data
+      val arr = chunkedByteBuf.getChunks().map(_.array())
       val bcast = broadcastManager.newBroadcast(arr, isLocal)
-      // toByteArray creates copy, so we can reuse out
-      out.reset()
+      // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of 
the standard one
+      // This implementation doesn't reallocate the whole memory block but 
allocates
+      // additional buffers. This way no buffers need to be garbage collected 
and
+      // the contents don't have to be copied to the new buffer.
+      val out = new ApacheByteArrayOutputStream()
       out.write(BROADCAST)
       val oos = new ObjectOutputStream(codec.compressedOutputStream(out))
       Utils.tryWithSafeFinally {
@@ -1350,10 +1355,10 @@ private[spark] object MapOutputTracker extends Logging {
         oos.close()
       }
       val outArr = out.toByteArray
-      logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual 
size = " + arr.length)
+      logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual 
size = " + arrSize)
       (outArr, bcast)
     } else {
-      (arr, null)
+      (chunkedByteBuf.toArray, null)
     }
   }
 
@@ -1362,13 +1367,12 @@ private[spark] object MapOutputTracker extends Logging {
       bytes: Array[Byte], conf: SparkConf): Array[T] = {
     assert (bytes.length > 0)
 
-    def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
+    def deserializeObject(in: InputStream): AnyRef = {
       val codec = CompressionCodec.createCodec(conf, 
conf.get(MAP_STATUS_COMPRESSION_CODEC))
       // The ZStd codec is wrapped in a `BufferedInputStream` which avoids 
overhead excessive
       // of JNI call while trying to decompress small amount of data for each 
element
       // of `MapStatuses`
-      val objIn = new ObjectInputStream(codec.compressedInputStream(
-        new ByteArrayInputStream(arr, off, len)))
+      val objIn = new ObjectInputStream(codec.compressedInputStream(in))
       Utils.tryWithSafeFinally {
         objIn.readObject()
       } {
@@ -1376,18 +1380,20 @@ private[spark] object MapOutputTracker extends Logging {
       }
     }
 
+    val in = new ByteArrayInputStream(bytes, 1, bytes.length - 1)
     bytes(0) match {
       case DIRECT =>
-        deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[T]]
+        deserializeObject(in).asInstanceOf[Array[T]]
       case BROADCAST =>
         try {
           // deserialize the Broadcast, pull .value array out of it, and then 
deserialize that
-          val bcast = deserializeObject(bytes, 1, bytes.length - 1).
-            asInstanceOf[Broadcast[Array[Byte]]]
+          val bcast = 
deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]]
           logInfo("Broadcast outputstatuses size = " + bytes.length +
-            ", actual size = " + bcast.value.length)
+            ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length))
+          val bcastIn = new 
ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream()
           // Important - ignore the DIRECT tag ! Start from offset 1
-          deserializeObject(bcast.value, 1, bcast.value.length - 
1).asInstanceOf[Array[T]]
+          bcastIn.skip(1)
+          deserializeObject(bcastIn).asInstanceOf[Array[T]]
         } catch {
           case e: IOException =>
             logWarning("Exception encountered during deserializing 
broadcasted" +
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 69cc8c1..e81196f 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
-import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus, MergeStatus}
+import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus, MapStatus, MergeStatus}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
 
@@ -664,4 +664,74 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
     tracker.stop()
     rpcEnv.shutdown()
   }
+
+  test("SPARK-32210: serialize mapStatuses to a nested Array and deserialize 
them") {
+    val newConf = new SparkConf
+
+    // needs TorrentBroadcast so need a SparkContext
+    withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { 
sc =>
+      val tracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+      val rpcEnv = sc.env.rpcEnv
+      val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, 
sc.getConf)
+      rpcEnv.stop(tracker.trackerEndpoint)
+      rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+      val shuffleId = 20
+      val numMaps = 1000
+
+      tracker.registerShuffle(shuffleId, numMaps, 
MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
+      val r = new scala.util.Random(912)
+      (0 until numMaps).foreach { i =>
+        tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus(
+          BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000),
+          Array.fill[Long](1000)((r.nextDouble() * 1024 * 1024 * 
1024).toLong), i))
+      }
+
+      val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
+      val (serializedMapStatus, serializedBroadcast) = 
MapOutputTracker.serializeOutputStatuses(
+        shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 
0, sc.getConf)
+      assert(serializedBroadcast.value.length > 1)
+      assert(serializedBroadcast.value.dropRight(1).forall(_.length == 1024 * 
1024))
+
+      val result = 
MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf)
+      assert(result.length == numMaps)
+
+      tracker.unregisterShuffle(shuffleId)
+      tracker.stop()
+    }
+  }
+
+  ignore("SPARK-32210: serialize and deserialize over 2GB compressed 
mapStatuses") {
+    // This test requires 8GB heap memory settings
+    val newConf = new SparkConf
+
+    // needs TorrentBroadcast so need a SparkContext
+    withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { 
sc =>
+      val tracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+      val rpcEnv = sc.env.rpcEnv
+      val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, 
sc.getConf)
+      rpcEnv.stop(tracker.trackerEndpoint)
+      rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+      val shuffleId = 20
+      val numMaps = 200000
+
+      tracker.registerShuffle(shuffleId, numMaps, 
MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
+      val r = new scala.util.Random(912)
+      (0 until numMaps).foreach { i =>
+        tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus(
+          BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000, 
Some(r.nextString(1024 * 5))),
+          Array.fill(10)((r.nextDouble() * 1024 * 1024 * 1024).toLong), i))
+      }
+
+      val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
+      val (serializedMapStatus, serializedBroadcast) = 
MapOutputTracker.serializeOutputStatuses(
+        shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 
0, sc.getConf)
+      assert(serializedBroadcast.value.foldLeft(0L)(_ + _.length) > 2L * 1024 
* 1024 * 1024)
+
+      val result = 
MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf)
+      assert(result.length == numMaps)
+
+      tracker.unregisterShuffle(shuffleId)
+      tracker.stop()
+    }
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala 
b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
index d808823..bb627bb 100644
--- a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala
@@ -64,14 +64,14 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase {
     val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
 
     var serializedMapStatusSizes = 0
-    var serializedBroadcastSizes = 0
+    var serializedBroadcastSizes = 0L
 
     val (serializedMapStatus, serializedBroadcast) = 
MapOutputTracker.serializeOutputStatuses(
       shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 
minBroadcastSize,
       sc.getConf)
     serializedMapStatusSizes = serializedMapStatus.length
     if (serializedBroadcast != null) {
-      serializedBroadcastSizes = serializedBroadcast.value.length
+      serializedBroadcastSizes = serializedBroadcast.value.foldLeft(0L)(_ + 
_.length)
     }
 
     benchmark.addCase("Serialization") { _ =>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to