Hello,

When I start a spark notebook, it makes some of the servers exhausting some
Linux kernel resources, as I can't even ssh to those nodes.

And it's not due to servers being hammered. It happens when there are no
spark jobs/taks are running. To reproduce this problem, it's enough to just
start a spark notebook (and keep spark context up).

Spark version: 1.5 but I had this problem in previous versions of Spark too.
Hadoop 2.6. Spark on YARN.
There are 150 containers, each has 2 vcores.

There is plenty of memory (yarn reserved 1.3Tb of memory across 10 nodes
for those 150 containers). GC is miniscule. There are many cores in the
system
too (24-48 cores per node). Servers are RHEL 6.7,
on 2.6.32-573.22.1.el6.x86_64

We have monitoring running on the servers.
Number of open connections jumps up to 2500-4000 when Spark is up.
It's just around 200 when Spark is not running.

I have tried to bump up many of the /etc/sysctl.conf limiting parameters
that we
might be reaching. Current set of non-default sysctl settings - see *[1]*.
When those parameters were increased to those levels, it feels better,
but stiil problem happens.

After some time of running tasks, jobs start to fail. Looks like there
might be
some leaking netty connections or something like that?
Jobs fail with "Failed to connect to.." error - see *[2]*. But this happens
way later,
it's an epogee of the problem when it starts affecting jobs. Before that
Spark
itself exhausts some resources and we can see that as we can't even ssh to
some
of the servers.

Also tuned up ulimits in the system, but no change in the behavior.
Problem always reproducible.
See a limits.d conf file in *[3] *

We only have this problem when Spark is running. Hive etc running just fine.

Any ideas?

Thank you.



*[1] *
Current set of non-default sysctl settings:

net.ipv4.ip_forward = 0
> net.ipv4.conf.default.rp_filter = 1
> net.ipv4.conf.default.accept_source_route = 0
> kernel.sysrq = 0
> kernel.core_uses_pid = 1
> net.ipv4.tcp_syncookies = 1
> net.bridge.bridge-nf-call-ip6tables = 0
> net.bridge.bridge-nf-call-iptables = 0
> net.bridge.bridge-nf-call-arptables = 0
> kernel.msgmnb = 65536
> kernel.msgmax = 65536
> kernel.shmmax = 68719476736
> net.ipv6.conf.all.disable_ipv6=1
> net.ipv6.conf.default.disable_ipv6=1
> net.ipv6.conf.lo.disable_ipv6=1
> vm.swappiness = 4
> net.ipv4.ip_local_port_range = 2000 65535
> net.ipv4.tcp_tw_reuse = 1
> net.ipv4.tcp_max_syn_backlog = 4096
> net.core.somaxconn = 2048
> net.ipv4.tcp_synack_retries = 3
> net.ipv4.tcp_fin_timeout = 20
> net.ipv4.tcp_max_syn_backlog = 8096
> net.core.netdev_max_backlog = 16000
> net.core.rmem_max = 134217728
> net.core.wmem_max = 134217728
> net.ipv4.tcp_rmem = 4096 87380 134217728
> net.ipv4.tcp_wmem = 4096 65536 134217728
> net.core.rmem_default = 262144
> net.core.wmem_default = 262144



*[2]* Error stack :

An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure:



> Task 101 in stage 54.0 failed 4 times, most recent failure: Lost task
> 101.3 in stage 54.0 (TID 18504, abc.def.com): java.io.IOException: Failed
> to connect to abc.def.com/10.20.32.118:23078
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>


> Caused by: java.net.ConnectException: Connection refused:
> abc.def.com/10.20.32.118:23078
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more



> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1507)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1469)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
> at
> org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:373)
> at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
> Failed to connect to abc.def.com/10.20.32.118:23078
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> ... 1 more



> Caused by: java.net.ConnectException: Connection refused:
> abc.def.com/10.20.32.118:23078
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more



[3] /etc/security/limits.d conf file in *[3] *

$ cat heavy-mr-jobs.conf
> *       -   memlock     256
> *       -   msgqueue    1600
> *       -   nofile      16384
> *       -   nproc       2048
> *       -   stack       40560

Reply via email to