Hi Till, Thanks for your help, I merged your code into my branch and the issue is gone.
BTW, do you have some insights for the first problem? "Once the yarn session CLI can't get cluster status(e.g. jobmanager is down), it will try to shutdown the cluster and cleanup related files even if new jobmanager will be created soon. As result, yarn will fail to start a new jobmanager due to missing files on HDFS. As a workround, I can config `akka.lookup.timeout` to wait a bit longer, say 60 seconds. But I'm wondering if it will affect other components." Thanks, Yelei ________________________________ From: Till Rohrmann <trohrm...@apache.org> Sent: Friday, March 24, 2017 8:09:29 AM To: dev@flink.apache.org Subject: Re: Question about flink client Hi Yelei, thanks for investigating the problem and pointing out the problematic parts. In fact, I recently stumbled across the very same problem in the JobClientActor and wrote a fix for it. It is already merged into the master. I hope that this fix solved the problem you've described. Cheers, Till On Wed, Mar 22, 2017 at 3:42 PM, Yelei Feng <feng...@outlook.com> wrote: > Hi, > > > I have two questions about flink client in interactive mode. > > > One is for yarn-session.sh, once the session CLI can't get cluster stauts > (jobmanager is down), it will try to shutdown the cluster and cleanup > related files even if new jobmanager will be created soon. As result, yarn > will fail to start a new jobmanager due to missing files on HDFS. As a > workround, I can config `akka.lookup.timeout` to wait a bit longer, say 60 > seconds. But I'm wondering if it will affect other components. > > > Second is about flink cli. If cluster is down after submiting job using > 'flink run xx.jar', cli hangs there only showing "New JobManager elected. > Connecting to null " instead of cleanup and close itself. > > > After some digging, I found the main logic is in JobClientActor. It > receives jobmanager status changes from two sources: zookeeper and akka > deathwatch. It would terminate itself once receiving message > `ConnectionTimeout`. > Client sets current leaderSessionId and unwatch previous jobmanager from > ZK; it receives `Teminated` of previous jobmanager from akka deathwatch and > send `ConnectionTimeout` to itself after 60s. In a great chance, they would > interfere with each other. > > Situation1: > > 1. client get notified from zk, set leaderSessionId to null > 2. client unwatch previous jobmanager > 3. msg `Teminated` of previous jobmanager never got received > > Situation 2: > > 1. msg `Teminated` of current jobmanager is received > 2. schedule msg ConnectionTimeout after 60s > 3. client get notified from zk, set `leaderSessionId` to null in less > than 60s > 4. `ConnectionTimeout` will be filtered out due to different > `leaderSessionId` > > > Both of the two problems only happen in interactive mode, not in detached > mode. I wonder if it's issues for interactive mode, or we should only use > detached mode in production environment? > > > Any insight is appreciated. > > > Thanks, > > Yelei > >