[
https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329605#comment-14329605
]
Imran Rashid commented on SPARK-1391:
-------------------------------------
[~coderplay], I assume you are no longer looking at this, right? I'm going to
take a crack at this issue if you don't mind. Here is my plan, copied from
SPARK-1476 (now that I've untangled those issues a little bit):
I'd like to start on it, with the following very minimal goals:
1. Make it possible for blocks to be bigger than 2GB
2. Maintain performance on smaller blocks
ie., I'm not going to try to do anything fancy to optimize performance of the
large blocks. To that end, my plan is to
1. create a {{LargeByteBuffer}} interface, which just has the same methods we
use on {{ByteBuffer}}
2. have one implementation that just wraps one ByteBuffer, and another which
wraps a completely static set of {{ByteBuffer}} s (eg., if you map a 3 GB file,
it'll just immediately map it to 2 {{ByteBuffer}} s, nothing fancy with only
mapping the first half of the file until the second is needed etc. etc.)
3. change {{ByteBuffer}} to {{LargeByteBuffer}} in {{BlockStore}}
I see that about a year back there was a lot of discussion on this in
SPARK-1476, and some alternate proposals. I'd like to push forward with a POC
to try to move the discussion along again. I know there was some discussion
about how important this is, and whether or not we want to support it. IMO this
is a big limitation and results in a lot of frustration for the users, we
really need a solution for this.
I could still be missing something, but I believe this should also solve
SPARK-3151
> BlockManager cannot transfer blocks larger than 2G in size
> ----------------------------------------------------------
>
> Key: SPARK-1391
> URL: https://issues.apache.org/jira/browse/SPARK-1391
> Project: Spark
> Issue Type: Improvement
> Components: Block Manager, Shuffle
> Affects Versions: 1.0.0
> Reporter: Shivaram Venkataraman
> Attachments: SPARK-1391.diff
>
>
> If a task tries to remotely access a cached RDD block, I get an exception
> when the block size is > 2G. The exception is pasted below.
> Memory capacities are huge these days (> 60G), and many workflows depend on
> having large blocks in memory, so it would be good to fix this bug.
> I don't know if the same thing happens on shuffles if one transfer (from
> mapper to reducer) is > 2G.
> {noformat}
> 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer
> message
> java.lang.ArrayIndexOutOfBoundsException
> at
> it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
> at
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
> at
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
> at
> org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
> at
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
> at
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
> at
> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
> at
> org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
> at
> org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661)
> at
> org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]