[ 
https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13959127#comment-13959127
 ] 

Min Zhou commented on SPARK-1391:
---------------------------------

Yes. Communication layer use ByteBuffer array to transfer messages, but the 
receiver will convert them back to BlockMessages where each block corresponding 
to one ByteBuffer, which can't be larger than 2GB. Those BlockMessages will be 
consumed by the connection caller in everywhere we can't control. 

One approach is write an CompositeByteBuffer to overcome the 2GB limitation, 
but still can't break some other limitation of ByteBuffer interface, like 
ByteBuffer.position(),  ByteBuffer.capacity(),  ByteBuffer.remaining(), whose 
return values are still integers. 

> BlockManager cannot transfer blocks larger than 2G in size
> ----------------------------------------------------------
>
>                 Key: SPARK-1391
>                 URL: https://issues.apache.org/jira/browse/SPARK-1391
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Shuffle
>    Affects Versions: 1.0.0
>            Reporter: Shivaram Venkataraman
>            Assignee: Min Zhou
>
> If a task tries to remotely access a cached RDD block, I get an exception 
> when the block size is > 2G. The exception is pasted below.
> Memory capacities are huge these days (> 60G), and many workflows depend on 
> having large blocks in memory, so it would be good to fix this bug.
> I don't know if the same thing happens on shuffles if one transfer (from 
> mapper to reducer) is > 2G.
> {noformat}
> 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer 
> message
> java.lang.ArrayIndexOutOfBoundsException
>         at 
> it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
>         at 
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
>         at 
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
>         at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>         at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
>         at 
> org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
>         at 
> org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
>         at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
>         at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
>         at 
> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
>         at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
>         at 
> org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
>         at 
> org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
>         at 
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
>         at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
>         at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at 
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at 
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
>         at 
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)
>         at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
>         at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
>         at 
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661)
>         at 
> org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to