[
https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343900#comment-14343900
]
Reynold Xin commented on SPARK-1391:
------------------------------------
@squito if you want to attempt something this large and core to the whole
engine, it would be better to do this incrementally, especially when this is a
part of the code that you are less familiar with.
I'd suggest breaking this task down in the following way, and getting feedback
incrementally as well.
1. LargeByteBuffer interface. (This alone would deserve its own design doc and
focus on how it integrates with ManagedBuffer)
2. Block storage
3. Block fetching
4. Block upload
Uploading blocks > 2G is extremely rare, as it is rarely used outside of
streaming, and streaming data blocks are usually small. It would also be much
simpler to deal with upload if we change it to sending a msg to the other end
and let the other end download the blocks instead.
> BlockManager cannot transfer blocks larger than 2G in size
> ----------------------------------------------------------
>
> Key: SPARK-1391
> URL: https://issues.apache.org/jira/browse/SPARK-1391
> Project: Spark
> Issue Type: Improvement
> Components: Block Manager, Shuffle
> Affects Versions: 1.0.0
> Reporter: Shivaram Venkataraman
> Attachments: BlockLimitDesign.pdf, SPARK-1391.diff
>
>
> If a task tries to remotely access a cached RDD block, I get an exception
> when the block size is > 2G. The exception is pasted below.
> Memory capacities are huge these days (> 60G), and many workflows depend on
> having large blocks in memory, so it would be good to fix this bug.
> I don't know if the same thing happens on shuffles if one transfer (from
> mapper to reducer) is > 2G.
> {noformat}
> 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer
> message
> java.lang.ArrayIndexOutOfBoundsException
> at
> it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
> at
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
> at
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
> at
> org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
> at
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
> at
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
> at
> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
> at
> org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
> at
> org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661)
> at
> org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]