Re: RDD collect hangs on large input data

2015-04-17 Thread Zsolt Tóth
Thanks for your answer Imran. I haven't tried your suggestions yet, but
setting spark.shuffle.blockTransferService=nio solved my issue. There is a
JIRA for this: https://issues.apache.org/jira/browse/SPARK-6962.

Zsolt

2015-04-14 21:57 GMT+02:00 Imran Rashid iras...@cloudera.com:

 is it possible that when you switch to the bigger data set, your data is
 skewed, and so that some tasks generate far more data?  reduceByKey could
 result in a huge amount of data going to a small number of tasks.  I'd
 suggest

 (a) seeing what happens if you don't collect() -- eg. instead try writing
 to hdfs with saveAsObjectFile.
 (b) take a look at what is happening on the executors with the long
 running tasks.  You can get thread dumps via the UI (or you can login into
 the boxes and use jstack).  This might point to some of your code that is
 taking a long time, or it might point to spark internals.

 On Wed, Apr 8, 2015 at 3:45 AM, Zsolt Tóth toth.zsolt@gmail.com
 wrote:

 I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause
 the issue? Did you test it with Java 8?





Re: RDD collect hangs on large input data

2015-04-08 Thread Zsolt Tóth
I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause
the issue? Did you test it with Java 8?


Re: RDD collect hangs on large input data

2015-04-07 Thread Jon Chase
Zsolt - what version of Java are you running?

On Mon, Mar 30, 2015 at 7:12 AM, Zsolt Tóth toth.zsolt@gmail.com
wrote:

 Thanks for your answer!
 I don't call .collect because I want to trigger the execution. I call it
 because I need the rdd on the driver. This is not a huge RDD and it's not
 larger than the one returned with 50GB input data.

 The end of the stack trace:

 The two IP's are the two worker nodes, I think they can't connect to the
 driver after they finished their part of the collect().

 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 
 (TID 1745). 1414 bytes result sent to driver
 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with 
 curMem=405753, maxMem=4883742720
 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values 
 in memory (estimated size 200.0 B, free 4.5 GB)
 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with 
 curMem=405953, maxMem=4883742720
 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values 
 in memory (estimated size 80.0 B, free 4.5 GB)
 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block 
 rdd_4_867
 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 
 (TID 1740). 1440 bytes result sent to driver
 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block 
 rdd_4_868
 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 
 (TID 1741). 1422 bytes result sent to driver
 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in 
 connection from /10.102.129.251:42026
 java.io.IOException: Connection timed out
   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
   at 
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
   at 
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
   at java.lang.Thread.run(Thread.java:745)
 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in 
 connection from /10.102.129.251:41703
 java.io.IOException: Connection timed out
   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
   at 
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
   at 
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
   at java.lang.Thread.run(Thread.java:745)
 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in 
 connection from /10.99.144.92:49021
 java.io.IOException: Connection timed out
   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
   at 
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
   at 
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
   at 
 

Re: RDD collect hangs on large input data

2015-03-30 Thread Zsolt Tóth
Thanks for your answer!
I don't call .collect because I want to trigger the execution. I call it
because I need the rdd on the driver. This is not a huge RDD and it's not
larger than the one returned with 50GB input data.

The end of the stack trace:

The two IP's are the two worker nodes, I think they can't connect to the
driver after they finished their part of the collect().

15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage
1.0 (TID 1745). 1414 bytes result sent to driver
15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200)
called with curMem=405753, maxMem=4883742720
15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as
values in memory (estimated size 200.0 B, free 4.5 GB)
15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called
with curMem=405953, maxMem=4883742720
15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as
values in memory (estimated size 80.0 B, free 4.5 GB)
15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of
block rdd_4_867
15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage
1.0 (TID 1740). 1440 bytes result sent to driver
15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of
block rdd_4_868
15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage
1.0 (TID 1741). 1422 bytes result sent to driver
15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in
connection from /10.102.129.251:42026
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in
connection from /10.102.129.251:41703
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in
connection from /10.99.144.92:49021
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 

Re: RDD collect hangs on large input data

2015-03-29 Thread Akhil Das
Don't call .collect if your data size huge, you can simply do a count() to
trigger the execution.

Can you paste your exception stack trace so that we'll know whats happening?

Thanks
Best Regards

On Fri, Mar 27, 2015 at 9:18 PM, Zsolt Tóth toth.zsolt@gmail.com
wrote:

 Hi,

 I have a simple Spark application: it creates an input rdd with
 sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The
 output rdd is small, a few MB's. Then I call collect() on the output.

 If the textfile is ~50GB, it finishes in a few minutes. However, if it's
 larger (~100GB) the execution hangs at the end of the collect() stage. The
 UI shows one active job (collect); one completed (flatMapToPair) and one
 active stage (collect). The collect stage has 880/892 tasks succeeded so I
 think the issue should happen when the whole job is finished (every task on
 the UI is either in SUCCESS or in RUNNING state).
 The driver and the containers don't log anything for 15 mins, then I get
 Connection time out.

 I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and
 Hadoop 2.4.0.

 This happens every time I run the process with larger input data so I
 think this isn't just a connection issue or something like that. Is this a
 Spark bug or something is wrong with my setup?

 Zsolt



RDD collect hangs on large input data

2015-03-27 Thread Zsolt Tóth
Hi,

I have a simple Spark application: it creates an input rdd with
sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The
output rdd is small, a few MB's. Then I call collect() on the output.

If the textfile is ~50GB, it finishes in a few minutes. However, if it's
larger (~100GB) the execution hangs at the end of the collect() stage. The
UI shows one active job (collect); one completed (flatMapToPair) and one
active stage (collect). The collect stage has 880/892 tasks succeeded so I
think the issue should happen when the whole job is finished (every task on
the UI is either in SUCCESS or in RUNNING state).
The driver and the containers don't log anything for 15 mins, then I get
Connection time out.

I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and
Hadoop 2.4.0.

This happens every time I run the process with larger input data so I think
this isn't just a connection issue or something like that. Is this a Spark
bug or something is wrong with my setup?

Zsolt