Re: RDD collect hangs on large input data
Thanks for your answer Imran. I haven't tried your suggestions yet, but setting spark.shuffle.blockTransferService=nio solved my issue. There is a JIRA for this: https://issues.apache.org/jira/browse/SPARK-6962. Zsolt 2015-04-14 21:57 GMT+02:00 Imran Rashid iras...@cloudera.com: is it possible that when you switch to the bigger data set, your data is skewed, and so that some tasks generate far more data? reduceByKey could result in a huge amount of data going to a small number of tasks. I'd suggest (a) seeing what happens if you don't collect() -- eg. instead try writing to hdfs with saveAsObjectFile. (b) take a look at what is happening on the executors with the long running tasks. You can get thread dumps via the UI (or you can login into the boxes and use jstack). This might point to some of your code that is taking a long time, or it might point to spark internals. On Wed, Apr 8, 2015 at 3:45 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause the issue? Did you test it with Java 8?
Re: RDD collect hangs on large input data
I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause the issue? Did you test it with Java 8?
Re: RDD collect hangs on large input data
Zsolt - what version of Java are you running? On Mon, Mar 30, 2015 at 7:12 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: Thanks for your answer! I don't call .collect because I want to trigger the execution. I call it because I need the rdd on the driver. This is not a huge RDD and it's not larger than the one returned with 50GB input data. The end of the stack trace: The two IP's are the two worker nodes, I think they can't connect to the driver after they finished their part of the collect(). 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 (TID 1745). 1414 bytes result sent to driver 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=405753, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values in memory (estimated size 200.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with curMem=405953, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values in memory (estimated size 80.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_867 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 (TID 1740). 1440 bytes result sent to driver 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_868 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 (TID 1741). 1422 bytes result sent to driver 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:42026 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:41703 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49021 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at
Re: RDD collect hangs on large input data
Thanks for your answer! I don't call .collect because I want to trigger the execution. I call it because I need the rdd on the driver. This is not a huge RDD and it's not larger than the one returned with 50GB input data. The end of the stack trace: The two IP's are the two worker nodes, I think they can't connect to the driver after they finished their part of the collect(). 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 (TID 1745). 1414 bytes result sent to driver 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=405753, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values in memory (estimated size 200.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with curMem=405953, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values in memory (estimated size 80.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_867 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 (TID 1740). 1440 bytes result sent to driver 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_868 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 (TID 1741). 1422 bytes result sent to driver 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:42026 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:41703 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49021 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at
Re: RDD collect hangs on large input data
Don't call .collect if your data size huge, you can simply do a count() to trigger the execution. Can you paste your exception stack trace so that we'll know whats happening? Thanks Best Regards On Fri, Mar 27, 2015 at 9:18 PM, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi, I have a simple Spark application: it creates an input rdd with sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The output rdd is small, a few MB's. Then I call collect() on the output. If the textfile is ~50GB, it finishes in a few minutes. However, if it's larger (~100GB) the execution hangs at the end of the collect() stage. The UI shows one active job (collect); one completed (flatMapToPair) and one active stage (collect). The collect stage has 880/892 tasks succeeded so I think the issue should happen when the whole job is finished (every task on the UI is either in SUCCESS or in RUNNING state). The driver and the containers don't log anything for 15 mins, then I get Connection time out. I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and Hadoop 2.4.0. This happens every time I run the process with larger input data so I think this isn't just a connection issue or something like that. Is this a Spark bug or something is wrong with my setup? Zsolt
RDD collect hangs on large input data
Hi, I have a simple Spark application: it creates an input rdd with sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The output rdd is small, a few MB's. Then I call collect() on the output. If the textfile is ~50GB, it finishes in a few minutes. However, if it's larger (~100GB) the execution hangs at the end of the collect() stage. The UI shows one active job (collect); one completed (flatMapToPair) and one active stage (collect). The collect stage has 880/892 tasks succeeded so I think the issue should happen when the whole job is finished (every task on the UI is either in SUCCESS or in RUNNING state). The driver and the containers don't log anything for 15 mins, then I get Connection time out. I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and Hadoop 2.4.0. This happens every time I run the process with larger input data so I think this isn't just a connection issue or something like that. Is this a Spark bug or something is wrong with my setup? Zsolt