[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14064996#comment-14064996 ]
Ken Carlile commented on SPARK-2282: ------------------------------------ So we've just given this a try with a 32 node cluster. Without the two sysctl commands, it obviously failed, using this code in pyspark: {code} data = sc.parallelize(range(0,30000000), 2000).map(lambda x: range(0,300)) data.cache() data.count() for i in range(0,20): data.count() {code} Unfortunately, with the two sysctls implemented on all nodes in the cluster, it also failed. Here's the java errors we see: {code:java} 14/07/17 10:55:37 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext java.net.NoRouteToHostException: Cannot assign requested address at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.<init>(Socket.java:425) at java.net.Socket.<init>(Socket.java:208) at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:404) at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:387) at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:72) at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:280) at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:278) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.Accumulators$.add(Accumulators.scala:278) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:820) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1226) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/spark-current/python/pyspark/rdd.py", line 708, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/usr/local/spark-current/python/pyspark/rdd.py", line 699, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File "/usr/local/spark-current/python/pyspark/rdd.py", line 619, in reduce vals = self.mapPartitions(func).collect() File "/usr/local/spark-current/python/pyspark/rdd.py", line 583, in collect bytesInJava = self._jrdd.collect().iterator() File "/usr/local/spark-current/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__ File "/usr/local/spark-current/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o158.collect. : org.apache.spark.SparkException: Job 14 cancelled as part of cancellation of all jobs at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1009) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1170) at org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1166) at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295) at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:253) at akka.actor.ActorCell.handleFailure(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:423) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> 14/07/17 10:55:38 ERROR OneForOneStrategy: Cannot assign requested address java.net.NoRouteToHostException: Cannot assign requested address at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.<init>(Socket.java:425) at java.net.Socket.<init>(Socket.java:208) at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:404) at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:387) at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:72) at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:280) at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:278) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.Accumulators$.add(Accumulators.scala:278) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:820) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1226) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} I did not see an inordinate number of connections from netstat; here's a sample output at around 10 iterations. {code} Every 2.0s: netstat -lpn Thu Jul 17 11:06:09 2014 Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 10.38.103.37:7077 0.0.0.0:* LISTEN 17945/java tcp 0 0 0.0.0.0:1191 0.0.0.0:* LISTEN 5870/mmfsd tcp 0 0 0.0.0.0:4040 0.0.0.0:* LISTEN 19916/java tcp 0 0 10.38.103.37:43721 0.0.0.0:* LISTEN 19916/java tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 3385/rpcbind tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN 17945/java tcp 0 0 0.0.0.0:42833 0.0.0.0:* LISTEN 19916/java tcp 0 0 0.0.0.0:37429 0.0.0.0:* LISTEN 19916/java tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 3827/sshd tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 5937/master tcp 0 0 127.0.0.1:55227 0.0.0.0:* LISTEN 19907/python tcp 0 0 127.0.0.1:37723 0.0.0.0:* LISTEN 19916/java tcp 0 0 0.0.0.0:6463 0.0.0.0:* LISTEN 4688/sge_execd tcp 0 0 0.0.0.0:35072 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:47457 0.0.0.0:* LISTEN 3495/rpc.statd tcp 0 0 0.0.0.0:5666 0.0.0.0:* LISTEN 18741/nrpe tcp 0 0 0.0.0.0:48451 0.0.0.0:* LISTEN 19916/java udp 0 0 0.0.0.0:41547 0.0.0.0:* - udp 0 0 0.0.0.0:56397 0.0.0.0:* 14452/rsyslogd udp 0 0 0.0.0.0:51920 0.0.0.0:* 3495/rpc.statd udp 0 0 0.0.0.0:111 0.0.0.0:* 3385/rpcbind udp 0 0 0.0.0.0:1012 0.0.0.0:* 3385/rpcbind udp 0 0 0.0.0.0:631 0.0.0.0:* 3324/portreserve udp 0 0 10.38.103.37:123 0.0.0.0:* 5700/ntpd udp 0 0 10.36.103.37:123 0.0.0.0:* 5700/ntpd udp 0 0 127.0.0.1:123 0.0.0.0:* 5700/ntpd udp 0 0 0.0.0.0:123 0.0.0.0:* 5700/ntpd udp 0 0 0.0.0.0:703 0.0.0.0:* 3495/rpc.statd Active UNIX domain sockets (only servers) Proto RefCnt Flags Type State I-Node PID/Program name Path unix 2 [ ACC ] STREAM LISTENING 14434 3671/sssd_nss /var/lib/sss/pipes/nss unix 2 [ ACC ] STREAM LISTENING 14442 3672/sssd_pam /var/lib/sss/pipes/pam unix 2 [ ACC ] STREAM LISTENING 21858 5937/master public/cleanup unix 2 [ ACC ] STREAM LISTENING 21865 5937/master private/tlsmgr unix 2 [ ACC ] STREAM LISTENING 21869 5937/master private/rewrite unix 2 [ ACC ] STREAM LISTENING 21873 5937/master private/bounce unix 2 [ ACC ] STREAM LISTENING 21877 5937/master private/defer unix 2 [ ACC ] STREAM LISTENING 21881 5937/master private/trace unix 2 [ ACC ] STREAM LISTENING 21885 5937/master private/verify unix 2 [ ACC ] STREAM LISTENING 21889 5937/master public/flush unix 2 [ ACC ] STREAM LISTENING 21893 5937/master private/proxymap unix 2 [ ACC ] STREAM LISTENING 21897 5937/master private/proxywrite unix 2 [ ACC ] STREAM LISTENING 21901 5937/master private/smtp unix 2 [ ACC ] STREAM LISTENING 21905 5937/master private/relay unix 2 [ ACC ] STREAM LISTENING 21909 5937/master public/showq unix 2 [ ACC ] STREAM LISTENING 21913 5937/master private/error unix 2 [ ACC ] STREAM LISTENING 21917 5937/master private/retry unix 2 [ ACC ] STREAM LISTENING 21921 5937/master private/discard unix 2 [ ACC ] STREAM LISTENING 21925 5937/master private/local unix 2 [ ACC ] STREAM LISTENING 21929 5937/master private/virtual unix 2 [ ACC ] STREAM LISTENING 21933 5937/master private/lmtp unix 2 [ ACC ] STREAM LISTENING 21937 5937/master private/anvil unix 2 [ ACC ] STREAM LISTENING 21941 5937/master private/scache unix 2 [ ACC ] STREAM LISTENING 23356 5870/mmfsd /var/mmfs/mmpmon/mmpmonSocket unix 2 [ ACC ] STREAM LISTENING 8544 1/init @/com/ubuntu/upstart unix 2 [ ACC ] STREAM LISTENING 14408 3669/sssd /var/lib/sss/pipes/private/sbus-monitor unix 2 [ ACC ] STREAM LISTENING 13457 3385/rpcbind /var/run/rpcbind.sock unix 2 [ ACC ] STREAM LISTENING 14967 3813/mcelog /var/run/mcelog-client unix 2 [ ACC ] STREAM LISTENING 13603 3477/dbus-daemon /var/run/dbus/system_bus_socket unix 2 [ ACC ] STREAM LISTENING 14444 3672/sssd_pam /var/lib/sss/pipes/private/pam unix 2 [ ACC ] STREAM LISTENING 14017 3608/hald @/var/run/hald/dbus-bOXq61fPG8 unix 2 [ ACC ] STREAM LISTENING 21315 5740/uuidd /var/run/uuidd/request unix 2 [ ACC ] STREAM LISTENING 14012 3608/hald @/var/run/hald/dbus-cI10ZZX1oL unix 2 [ ACC ] STREAM LISTENING 14417 3670/sssd_be /var/lib/sss/pipes/private/sbus-dp_default.3670 {code} The cluster is composed of 32 identical Dell R620s with 2x SandyBridge 8 core Xeons (16 cores total/server), 128GB RAM, and 10Gb ethernet. Our latency is ~0.1ms between all servers in the cluster. 1 node runs as the master, with the other 31 as slaves. Please let me know if you need more information or whether this looks like a different bug. > PySpark crashes if too many tasks complete quickly > -------------------------------------------------- > > Key: SPARK-2282 > URL: https://issues.apache.org/jira/browse/SPARK-2282 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 0.9.1, 1.0.0, 1.0.1 > Reporter: Aaron Davidson > Assignee: Aaron Davidson > Fix For: 0.9.2, 1.0.0, 1.0.1 > > > Upon every task completion, PythonAccumulatorParam constructs a new socket to > the Accumulator server running inside the pyspark daemon. This can cause a > buildup of used ephemeral ports from sockets in the TIME_WAIT termination > stage, which will cause the SparkContext to crash if too many tasks complete > too quickly. We ran into this bug with 17k tasks completing in 15 seconds. > This bug can be fixed outside of Spark by ensuring these properties are set > (on a linux server); > echo "1" > /proc/sys/net/ipv4/tcp_tw_reuse > echo "1" > /proc/sys/net/ipv4/tcp_tw_recycle > or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)