[ 
https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14064996#comment-14064996
 ] 

Ken Carlile commented on SPARK-2282:
------------------------------------

So we've just given this a try with a 32 node cluster. Without the two sysctl 
commands, it obviously failed, using this code in pyspark: 

{code}
data = sc.parallelize(range(0,30000000), 2000).map(lambda x: range(0,300))
data.cache()
data.count()
for i in range(0,20): data.count()
{code}

Unfortunately, with the two sysctls implemented on all nodes in the cluster, it 
also failed. Here's the java errors we see: 
{code:java}
14/07/17 10:55:37 ERROR DAGSchedulerActorSupervisor: eventProcesserActor 
failed; shutting down SparkContext
java.net.NoRouteToHostException: Cannot assign requested address
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
        at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
        at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:579)
        at java.net.Socket.connect(Socket.java:528)
        at java.net.Socket.<init>(Socket.java:425)
        at java.net.Socket.<init>(Socket.java:208)
        at 
org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:404)
        at 
org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:387)
        at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:72)
        at 
org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:280)
        at 
org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:278)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.Accumulators$.add(Accumulators.scala:278)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:820)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1226)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-current/python/pyspark/rdd.py", line 708, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark-current/python/pyspark/rdd.py", line 699, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/usr/local/spark-current/python/pyspark/rdd.py", line 619, in reduce
    vals = self.mapPartitions(func).collect()
  File "/usr/local/spark-current/python/pyspark/rdd.py", line 583, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File 
"/usr/local/spark-current/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", 
line 537, in __call__
  File 
"/usr/local/spark-current/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o158.collect.
: org.apache.spark.SparkException: Job 14 cancelled as part of cancellation of 
all jobs
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1009)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at 
org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:499)
        at 
org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1170)
        at 
org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1166)
        at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295)
        at 
akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:253)
        at akka.actor.ActorCell.handleFailure(ActorCell.scala:338)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:423)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
        at akka.dispatch.Mailbox.run(Mailbox.scala:218)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

>>> 14/07/17 10:55:38 ERROR OneForOneStrategy: Cannot assign requested address
java.net.NoRouteToHostException: Cannot assign requested address
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
        at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
        at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:579)
        at java.net.Socket.connect(Socket.java:528)
        at java.net.Socket.<init>(Socket.java:425)
        at java.net.Socket.<init>(Socket.java:208)
        at 
org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:404)
        at 
org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:387)
        at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:72)
        at 
org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:280)
        at 
org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:278)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.Accumulators$.add(Accumulators.scala:278)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:820)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1226)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

I did not see an inordinate number of connections from netstat; here's a sample 
output at around 10 iterations. 

{code}
Every 2.0s: netstat -lpn                                                        
                                                     Thu Jul 17 11:06:09 2014

Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address               Foreign Address             
State       PID/Program name
tcp        0      0 10.38.103.37:7077           0.0.0.0:*                   
LISTEN      17945/java
tcp        0      0 0.0.0.0:1191                0.0.0.0:*                   
LISTEN      5870/mmfsd
tcp        0      0 0.0.0.0:4040                0.0.0.0:*                   
LISTEN      19916/java
tcp        0      0 10.38.103.37:43721          0.0.0.0:*                   
LISTEN      19916/java
tcp        0      0 0.0.0.0:111                 0.0.0.0:*                   
LISTEN      3385/rpcbind
tcp        0      0 0.0.0.0:8080                0.0.0.0:*                   
LISTEN      17945/java
tcp        0      0 0.0.0.0:42833               0.0.0.0:*                   
LISTEN      19916/java
tcp        0      0 0.0.0.0:37429               0.0.0.0:*                   
LISTEN      19916/java
tcp        0      0 0.0.0.0:22                  0.0.0.0:*                   
LISTEN      3827/sshd
tcp        0      0 127.0.0.1:25                0.0.0.0:*                   
LISTEN      5937/master
tcp        0      0 127.0.0.1:55227             0.0.0.0:*                   
LISTEN      19907/python
tcp        0      0 127.0.0.1:37723             0.0.0.0:*                   
LISTEN      19916/java
tcp        0      0 0.0.0.0:6463                0.0.0.0:*                   
LISTEN      4688/sge_execd
tcp        0      0 0.0.0.0:35072               0.0.0.0:*                   
LISTEN      -
tcp        0      0 0.0.0.0:47457               0.0.0.0:*                   
LISTEN      3495/rpc.statd
tcp        0      0 0.0.0.0:5666                0.0.0.0:*                   
LISTEN      18741/nrpe
tcp        0      0 0.0.0.0:48451               0.0.0.0:*                   
LISTEN      19916/java
udp        0      0 0.0.0.0:41547               0.0.0.0:*                       
        -
udp        0      0 0.0.0.0:56397               0.0.0.0:*                       
        14452/rsyslogd
udp        0      0 0.0.0.0:51920               0.0.0.0:*                       
        3495/rpc.statd
udp        0      0 0.0.0.0:111                 0.0.0.0:*                       
        3385/rpcbind
udp        0      0 0.0.0.0:1012                0.0.0.0:*                       
        3385/rpcbind
udp        0      0 0.0.0.0:631                 0.0.0.0:*                       
        3324/portreserve
udp        0      0 10.38.103.37:123            0.0.0.0:*                       
        5700/ntpd
udp        0      0 10.36.103.37:123            0.0.0.0:*                       
        5700/ntpd
udp        0      0 127.0.0.1:123               0.0.0.0:*                       
        5700/ntpd
udp        0      0 0.0.0.0:123                 0.0.0.0:*                       
        5700/ntpd
udp        0      0 0.0.0.0:703                 0.0.0.0:*                       
        3495/rpc.statd
Active UNIX domain sockets (only servers)
Proto RefCnt Flags       Type       State         I-Node PID/Program name    
Path
unix  2      [ ACC ]     STREAM     LISTENING     14434  3671/sssd_nss       
/var/lib/sss/pipes/nss
unix  2      [ ACC ]     STREAM     LISTENING     14442  3672/sssd_pam       
/var/lib/sss/pipes/pam
unix  2      [ ACC ]     STREAM     LISTENING     21858  5937/master         
public/cleanup
unix  2      [ ACC ]     STREAM     LISTENING     21865  5937/master         
private/tlsmgr
unix  2      [ ACC ]     STREAM     LISTENING     21869  5937/master         
private/rewrite
unix  2      [ ACC ]     STREAM     LISTENING     21873  5937/master         
private/bounce
unix  2      [ ACC ]     STREAM     LISTENING     21877  5937/master         
private/defer
unix  2      [ ACC ]     STREAM     LISTENING     21881  5937/master         
private/trace
unix  2      [ ACC ]     STREAM     LISTENING     21885  5937/master         
private/verify
unix  2      [ ACC ]     STREAM     LISTENING     21889  5937/master         
public/flush
unix  2      [ ACC ]     STREAM     LISTENING     21893  5937/master         
private/proxymap
unix  2      [ ACC ]     STREAM     LISTENING     21897  5937/master         
private/proxywrite
unix  2      [ ACC ]     STREAM     LISTENING     21901  5937/master         
private/smtp
unix  2      [ ACC ]     STREAM     LISTENING     21905  5937/master         
private/relay
unix  2      [ ACC ]     STREAM     LISTENING     21909  5937/master         
public/showq
unix  2      [ ACC ]     STREAM     LISTENING     21913  5937/master         
private/error
unix  2      [ ACC ]     STREAM     LISTENING     21917  5937/master         
private/retry
unix  2      [ ACC ]     STREAM     LISTENING     21921  5937/master         
private/discard
unix  2      [ ACC ]     STREAM     LISTENING     21925  5937/master         
private/local
unix  2      [ ACC ]     STREAM     LISTENING     21929  5937/master         
private/virtual
unix  2      [ ACC ]     STREAM     LISTENING     21933  5937/master         
private/lmtp
unix  2      [ ACC ]     STREAM     LISTENING     21937  5937/master         
private/anvil
unix  2      [ ACC ]     STREAM     LISTENING     21941  5937/master         
private/scache
unix  2      [ ACC ]     STREAM     LISTENING     23356  5870/mmfsd          
/var/mmfs/mmpmon/mmpmonSocket
unix  2      [ ACC ]     STREAM     LISTENING     8544   1/init              
@/com/ubuntu/upstart
unix  2      [ ACC ]     STREAM     LISTENING     14408  3669/sssd           
/var/lib/sss/pipes/private/sbus-monitor
unix  2      [ ACC ]     STREAM     LISTENING     13457  3385/rpcbind        
/var/run/rpcbind.sock
unix  2      [ ACC ]     STREAM     LISTENING     14967  3813/mcelog         
/var/run/mcelog-client
unix  2      [ ACC ]     STREAM     LISTENING     13603  3477/dbus-daemon    
/var/run/dbus/system_bus_socket
unix  2      [ ACC ]     STREAM     LISTENING     14444  3672/sssd_pam       
/var/lib/sss/pipes/private/pam
unix  2      [ ACC ]     STREAM     LISTENING     14017  3608/hald           
@/var/run/hald/dbus-bOXq61fPG8
unix  2      [ ACC ]     STREAM     LISTENING     21315  5740/uuidd          
/var/run/uuidd/request
unix  2      [ ACC ]     STREAM     LISTENING     14012  3608/hald           
@/var/run/hald/dbus-cI10ZZX1oL
unix  2      [ ACC ]     STREAM     LISTENING     14417  3670/sssd_be        
/var/lib/sss/pipes/private/sbus-dp_default.3670
{code}
The cluster is composed of 32 identical Dell R620s with 2x SandyBridge 8 core 
Xeons (16 cores total/server), 128GB RAM, and 10Gb ethernet. Our latency is 
~0.1ms between all servers in the cluster. 1 node runs as the master, with the 
other 31 as slaves. 

Please let me know if you need more information or whether this looks like a 
different bug. 

> PySpark crashes if too many tasks complete quickly
> --------------------------------------------------
>
>                 Key: SPARK-2282
>                 URL: https://issues.apache.org/jira/browse/SPARK-2282
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 0.9.1, 1.0.0, 1.0.1
>            Reporter: Aaron Davidson
>            Assignee: Aaron Davidson
>             Fix For: 0.9.2, 1.0.0, 1.0.1
>
>
> Upon every task completion, PythonAccumulatorParam constructs a new socket to 
> the Accumulator server running inside the pyspark daemon. This can cause a 
> buildup of used ephemeral ports from sockets in the TIME_WAIT termination 
> stage, which will cause the SparkContext to crash if too many tasks complete 
> too quickly. We ran into this bug with 17k tasks completing in 15 seconds.
> This bug can be fixed outside of Spark by ensuring these properties are set 
> (on a linux server);
> echo "1" > /proc/sys/net/ipv4/tcp_tw_reuse
> echo "1" > /proc/sys/net/ipv4/tcp_tw_recycle
> or by adding the SO_REUSEADDR option to the Socket creation within Spark.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to