[
https://issues.apache.org/jira/browse/FLINK-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813325#comment-16813325
]
Andrey Zagrebin commented on FLINK-8902:
----------------------------------------
I managed to reproduce it quit quickly locally with standalone setup and
embedded bin/zookeeper.sh to enable HA.
The reason seems to be the following.
JobMaster.rescaleOperators takes a savepoint in getJobModificationSavepoint
which eventually calls ZooKeeperCompletedCheckpointStore.addCheckpoint.
This creates a node and lock in ZooKeeperStateHandleStore for savepoint.
Then JobMaster.rescaleOperators restores the new execution graph in
restoreExecutionGraphFromRescalingSavepoint. This calls eventually again
ZooKeeperCompletedCheckpointStore.addCheckpoint for savepoint. The problem is
that although It happens in another ZooKeeperCompletedCheckpointStore instance,
it uses the same underlying zookeeper storage where the same node already
exists with the same checkpoint id.
In case of non-HA scenario, we need to import the savepoint to the new
checkpoint storage of rescaled execution graph but for HA mode, it turns out to
be the same underlying zookeeper storage.
> Re-scaling job sporadically fails with KeeperException
> ------------------------------------------------------
>
> Key: FLINK-8902
> URL: https://issues.apache.org/jira/browse/FLINK-8902
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.5.0, 1.6.0
> Environment: Commit: 80020cb
> Hadoop: 2.8.3
> YARN
>
> Reporter: Gary Yao
> Assignee: Andrey Zagrebin
> Priority: Critical
> Labels: flip6
> Fix For: 1.7.3, 1.6.5
>
>
> *Description*
> Re-scaling a job with {{bin/flink modify -p <new_parallelism>}} sporadically
> fails with a {{KeeperException}}
> *Steps to reproduce*
> # Submit job to Flink cluster with flip6 enabled running on YARN (session
> mode).
> # Re-scale job (5-20 times)
> *Stacktrace (client)*
> {noformat}
> org.apache.flink.util.FlinkException: Could not rescale job
> 61e2e99db2e959ebd94e40f9c5e816bc.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$modify$8(CliFrontend.java:766)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:954)
> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:757)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1037)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could
> not restore from temporary rescaling savepoint. This might indicate that the
> savepoint
> hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got
> corrupted. Deleting this savepoint as a precaution.
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$3(JobMaster.java:525)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could
> not restore from temporary rescaling savepoint. This might indicate that the
> savepoint
> hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got
> corrupted. Deleting this savepoint as a precaution.
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1317)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly
> modified
> at
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:168)
> at
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:233)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1088)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1161)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1297)
> ... 10 more
> Caused by:
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException:
> KeeperErrorCode = NodeExists
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.access$200(CuratorTransactionImpl.java:44)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:129)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:125)
> at
> org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:122)
> at
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:162)
> ... 14 more
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)