Hi Yelei, I think you're right that we should not only rely on the response of the JobManager but also ask the Yarn client what the status of the application is. If the application status is something different than Succeeded, then it should be save to shut down the cluster. If this is not the case, then it would be great if you could file an issue to fix it.
Cheers, Till On Mon, Mar 27, 2017 at 9:21 AM, Yelei Feng <feng...@outlook.com> wrote: > Hi Till, > > Thanks for your help, I merged your code into my branch and the issue is > gone. > > BTW, do you have some insights for the first problem? > > "Once the yarn session CLI can't get cluster status(e.g. jobmanager is > down), it will try to shutdown the cluster and cleanup related files even > if new jobmanager will be created soon. As result, yarn will fail to start > a new jobmanager due to missing files on HDFS. As a workround, I can config > `akka.lookup.timeout` to wait a bit longer, say 60 seconds. But I'm > wondering if it will affect other components." > > Thanks, > Yelei > ________________________________ > From: Till Rohrmann <trohrm...@apache.org> > Sent: Friday, March 24, 2017 8:09:29 AM > To: dev@flink.apache.org > Subject: Re: Question about flink client > > Hi Yelei, > > thanks for investigating the problem and pointing out the problematic > parts. In fact, I recently stumbled across the very same problem in the > JobClientActor and wrote a fix for it. It is already merged into the > master. I hope that this fix solved the problem you've described. > > Cheers, > Till > > On Wed, Mar 22, 2017 at 3:42 PM, Yelei Feng <feng...@outlook.com> wrote: > > > Hi, > > > > > > I have two questions about flink client in interactive mode. > > > > > > One is for yarn-session.sh, once the session CLI can't get cluster > stauts > > (jobmanager is down), it will try to shutdown the cluster and cleanup > > related files even if new jobmanager will be created soon. As result, > yarn > > will fail to start a new jobmanager due to missing files on HDFS. As a > > workround, I can config `akka.lookup.timeout` to wait a bit longer, say > 60 > > seconds. But I'm wondering if it will affect other components. > > > > > > Second is about flink cli. If cluster is down after submiting job using > > 'flink run xx.jar', cli hangs there only showing "New JobManager > elected. > > Connecting to null " instead of cleanup and close itself. > > > > > > After some digging, I found the main logic is in JobClientActor. It > > receives jobmanager status changes from two sources: zookeeper and akka > > deathwatch. It would terminate itself once receiving message > > `ConnectionTimeout`. > > Client sets current leaderSessionId and unwatch previous jobmanager from > > ZK; it receives `Teminated` of previous jobmanager from akka deathwatch > and > > send `ConnectionTimeout` to itself after 60s. In a great chance, they > would > > interfere with each other. > > > > Situation1: > > > > 1. client get notified from zk, set leaderSessionId to null > > 2. client unwatch previous jobmanager > > 3. msg `Teminated` of previous jobmanager never got received > > > > Situation 2: > > > > 1. msg `Teminated` of current jobmanager is received > > 2. schedule msg ConnectionTimeout after 60s > > 3. client get notified from zk, set `leaderSessionId` to null in less > > than 60s > > 4. `ConnectionTimeout` will be filtered out due to different > > `leaderSessionId` > > > > > > Both of the two problems only happen in interactive mode, not in > detached > > mode. I wonder if it's issues for interactive mode, or we should only > use > > detached mode in production environment? > > > > > > Any insight is appreciated. > > > > > > Thanks, > > > > Yelei > > > > >