Hi Dyana,

your analysis is almost correct. The only part which is missing is that the
lock nodes are created as ephemeral nodes. This should ensure that if a JM
process dies that the lock nodes will get removed by ZooKeeper. It depends
a bit on ZooKeeper's configuration how long it takes until Zk detects a
client connection as lost and then removes the ephemeral nodes. If the job
should terminate within this time interval, then it could happen that you
cannot remove the checkpoint/JobGraph. However, usually the Zookeeper
session timeout should be configured to be a couple of seconds.

I would actually be interested in better understanding your problem to see
whether this is still a bug in Flink. Could you maybe share the respective
logs on DEBUG log level with me? Maybe it would also be possible to run the
latest version of Flink (1.7.2) to include all possible bug fixes.

FYI: The community is currently discussing to reimplement the ZooKeeper
based high availability services [1]. One idea is to get rid of the lock
nodes by replacing them with transactions on the leader node. This could
prevent these kind of bugs in the future.

[1] https://issues.apache.org/jira/browse/FLINK-10333

Cheers,
Till

On Thu, Apr 18, 2019 at 3:12 PM dyana.rose <dyana.r...@salecycle.com> wrote:

> Flink v1.7.1
>
> After a Flink reboot we've been seeing some unexpected issues with excess
> retained checkpoints not being able to be removed from ZooKeeper after a
> new checkpoint is created.
>
> I believe I've got my head around the role of ZK and lockNodes in
> Checkpointing after going through the code. Could you check my logic on
> this and add any insight, especially if I've got it wrong?
>
> The situation:
> 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA
> with S3 as the backing store.
>
> 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has
> its own lockNode UUID. JM1 is elected leader.
>
> 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's
> JobGraph lockNode.
>
> 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's
> checkpoint lockNode. We continue running, and checkpoints are successfully
> being created and excess checkpoints removed.
>
> 5) Both JM1 and JM2 now are rebooted.
>
> 6) The JobGraph is recovered by the leader, the job restarts from the
> latest checkpoint.
>
> Now after every new checkpoint we see in the ZooKeeper logs:
> INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got
> user-level KeeperException when processing sessionid:0x10000047715000d
> type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error
> Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813
> Error:KeeperErrorCode = Directory not empty for
> /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781
> with an increasing checkpoint id on each subsequent call.
>
> When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled,
> right? As the old checkpoints were created under the old UUID, the new JMs
> will never be able to remove the old retained checkpoints from ZooKeeper.
>
> Is that correct?
>
> If so, would this also happen with JobGraphs in the following situation
> (we saw this just recently where we had a JobGraph for a cancelled job
> still in ZK):
>
> Steps 1 through 3 above, then:
> 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1
> restarts.
>
> 5) some time later while JM2 is still leader we hard cancel the job and
> restart the JMs
>
> In this case JM2 would successfully remove the job from s3, but because
> its lockNode is different from JM1 it cannot delete the lock file in the
> jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and
> tries to process the JobGraph it has found, but the S3 files have been
> deleted.
>
> Possible related closed issues (fixes went in v1.7.0):
> https://issues.apache.org/jira/browse/FLINK-10184 and
> https://issues.apache.org/jira/browse/FLINK-10255
>
> Thanks for any insight,
> Dyana
>

Reply via email to