[
https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14338648#comment-14338648
]
Imran Rashid commented on SPARK-1391:
-------------------------------------
The one complication here comes from the network transfer required by
replication. It we ignore the {{NioBlockTransferService}} for now and just
look at {{NettyBlockTransferService}}, the existing behavior is:
1. replication results in a request to
[{{NettyBlockTransferService#uploadBlocks}} |
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L126],
which sends an {{UploadBlock}} msg to a peer. The {{UploadBlock}} message
contains the full payload, which is limited to 2GB currently.
2. The message is received by [{{NettyBlockRpcServer}} |
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala#L62]
where it is simply deserialized and inserted into the local block manager.
I'm thinking we could break a block apart into multiple messages, eg.
{{UploadPartialBlock}}, with each message limited to 2GB (or even less). Then
{{NettyBlockRpcServer}} would queue up all the messages, and once it had
received them all it would put them together and insert the block locally.
My concern with that approach is robustness -- what happens if some of the
{{UploadPartialBlock}} s never make it, for whatever reason? We wouldn't want
{{NettyBlockRpcServer}} to simply hold on to those partial msgs in memory
indefinitely. Does it make sense to introduce a timeout? When the first
{{UploadPartialBlock}} is received, it would only wait for the rest of the msgs
a limited time before dumping those partial blocks.
> BlockManager cannot transfer blocks larger than 2G in size
> ----------------------------------------------------------
>
> Key: SPARK-1391
> URL: https://issues.apache.org/jira/browse/SPARK-1391
> Project: Spark
> Issue Type: Improvement
> Components: Block Manager, Shuffle
> Affects Versions: 1.0.0
> Reporter: Shivaram Venkataraman
> Attachments: SPARK-1391.diff
>
>
> If a task tries to remotely access a cached RDD block, I get an exception
> when the block size is > 2G. The exception is pasted below.
> Memory capacities are huge these days (> 60G), and many workflows depend on
> having large blocks in memory, so it would be good to fix this bug.
> I don't know if the same thing happens on shuffles if one transfer (from
> mapper to reducer) is > 2G.
> {noformat}
> 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer
> message
> java.lang.ArrayIndexOutOfBoundsException
> at
> it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
> at
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
> at
> it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
> at
> org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
> at
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
> at
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
> at
> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
> at
> org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
> at
> org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661)
> at
> org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]