[ 
https://issues.apache.org/jira/browse/SPARK-12826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15099099#comment-15099099
 ] 

Shixiong Zhu commented on SPARK-12826:
--------------------------------------

I have a hack. In onDisconnected
{code}
  override def onDisconnected(remoteAddress: RpcAddress): Unit = {
    if (master.exists(_.address == remoteAddress)) {
      logInfo(s"$remoteAddress Disassociated !")
      masterDisconnected()
    }
  }
{code}
master is sent from Master which doesn't know the correct address of itself. 
However, Worker does know the address that it uses to fire the connection, so 
when Worker register with Master successfully, we can store the address (an 
address in Worker.masterRpcAddresses) as well and use it to compare in 
"onDisconnected".

CC [~vanzin] any thought about this use case?

> Spark Workers do not attempt reconnect or exit on connection failure.
> ---------------------------------------------------------------------
>
>                 Key: SPARK-12826
>                 URL: https://issues.apache.org/jira/browse/SPARK-12826
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0
>            Reporter: Alan Braithwaite
>
> Spark version 1.6.0 Hadoop 2.6.0 CDH 5.4.2
> We're running behind a tcp proxy (10.14.12.11:7077 is the tcp proxy listen 
> address in the example, upstreaming to the spark master listening on 9682 and 
> a different IP)
> To reproduce, I started a spark worker, let it successfully connect to the 
> master through the proxy, then tcpkill'd the connection on the Worker.  
> Nothing is logged from the code handling reconnection attempts.
> {code}
> 16/01/14 18:23:30 INFO Worker: Connecting to master 
> spark-master.example.com:7077...
> 16/01/14 18:23:30 DEBUG TransportClientFactory: Creating new connection to 
> spark-master.example.com/10.14.12.11:7077
> 16/01/14 18:23:30 DEBUG TransportClientFactory: Connection to 
> spark-master.example.com/10.14.12.11:7077 successful, running bootstraps...
> 16/01/14 18:23:30 DEBUG TransportClientFactory: Successfully created 
> connection to spark-master.example.com/10.14.12.11:7077 after 1 ms (0 ms 
> spent in bootstraps)
> 16/01/14 18:23:30 DEBUG Recycler: -Dio.netty.recycler.maxCapacity.default: 
> 262144
> 16/01/14 18:23:30 INFO Worker: Successfully registered with master 
> spark://0.0.0.0:9682
> 16/01/14 18:23:30 INFO Worker: Worker cleanup enabled; old application 
> directories will be deleted in: /var/lib/spark/work
> 16/01/14 18:36:52 DEBUG SecurityManager: user=null aclsEnabled=false 
> viewAcls=spark
> 16/01/14 18:36:52 DEBUG SecurityManager: user=null aclsEnabled=false 
> viewAcls=spark
> 16/01/14 18:36:57 DEBUG SecurityManager: user=null aclsEnabled=false 
> viewAcls=spark
> 16/01/14 18:36:57 DEBUG SecurityManager: user=null aclsEnabled=false 
> viewAcls=spark
> 16/01/14 18:41:31 WARN TransportChannelHandler: Exception in connection from 
> spark-master.example.com/10.14.12.11:7077
> java.io.IOException: Connection reset by peer
>       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>       at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>       at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
>       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> -- nothing more is logged, going on 15 minutes --
> $ ag -C5 Disconn 
> core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
> 313    registrationRetryTimer.foreach(_.cancel(true))
> 314    registrationRetryTimer = None
> 315  }
> 316
> 317  private def registerWithMaster() {
> 318    // onDisconnected may be triggered multiple times, so don't attempt 
> registration
> 319    // if there are outstanding registration attempts scheduled.
> 320    registrationRetryTimer match {
> 321      case None =>
> 322        registered = false
> 323        registerMasterFutures = tryRegisterAllMasters()
> --
> 549        finishedExecutors.values.toList, drivers.values.toList,
> 550        finishedDrivers.values.toList, activeMasterUrl, cores, memory,
> 551        coresUsed, memoryUsed, activeMasterWebUiUrl))
> 552  }
> 553
> 554  override def onDisconnected(remoteAddress: RpcAddress): Unit = {
> 555    if (master.exists(_.address == remoteAddress)) {
> 556      logInfo(s"$remoteAddress Disassociated !")
> 557      masterDisconnected()
> 558    }
> 559  }
> 560
> 561  private def masterDisconnected() {
> 562    logError("Connection to master failed! Waiting for master to 
> reconnect...")
> 563    connected = false
> 564    registerWithMaster()
> 565  }
> 566
> {code}
> Please let me know if there is any other information I can provide.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to