[
https://issues.apache.org/jira/browse/FLINK-15087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Abdul Qadeer updated FLINK-15087:
---------------------------------
Description:
While testing I found that the loss of connection with zookeeper triggers JVM
shutdown for Job Manager, when started through
"StandaloneSessionClusterEntrypoint". This happens due to a NPE on
"taskManagerHeartbeatManager."
When JobManagerRunner suspends jobMasterService (as Job manager is no longer
leader), "taskManagerHeartbeatManager" is set to null in
"stopHeartbeatServices".
Next, "AkkaRpcActor" stops JobMaster and throws NPE in the following method:
{code:java}
@Override
public CompletableFuture<Acknowledge> disconnectTaskManager(final ResourceID
resourceID, final Exception cause) {
log.debug("Disconnect TaskExecutor {} because: {}", resourceID,
cause.getMessage());
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
slotPool.releaseTaskManager(resourceID, cause);
{code}
This leads to a fatal error finally in "ClusterEntryPoint.onFatalError()" and
forces JVM shutdown.
The stack trace is below:
{noformat}
{"timeMillis":1575581120723,"thread":"flink-akka.actor.default-dispatcher-93","level":"ERROR","loggerName":"com.Sample","message":"Failed
to take leadership with session id
b4662db5-f065-41d9-aaaf-78625355b251.","thrown":{"commonElementCount":0,"localizedMessage":"Failed
to take leadership with session id
b4662db5-f065-41d9-aaaf-78625355b251.","message":"Failed to take leadership
with session id
b4662db5-f065-41d9-aaaf-78625355b251.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":18,"localizedMessage":"Termination
of previous JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot
submit job under the same job id.","message":"Termination of previous
JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot submit job
under the same job
id.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":6,"localizedMessage":"org.apache.flink.util.FlinkException:
Could not properly shut down the
JobManagerRunner","message":"org.apache.flink.util.FlinkException: Could not
properly shut down the
JobManagerRunner","name":"java.util.concurrent.CompletionException","cause":{"commonElementCount":6,"localizedMessage":"Could
not properly shut down the JobManagerRunner","message":"Could not properly
shut down the
JobManagerRunner","name":"org.apache.flink.util.FlinkException","cause":{"commonElementCount":13,"localizedMessage":"Failure
while stopping RpcEndpoint jobmanager_0.","message":"Failure while stopping
RpcEndpoint
jobmanager_0.","name":"org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException","cause":{"commonElementCount":13,"name":"java.lang.NullPointerException","extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"disconnectTaskManager","file":"JobMaster.java","line":629,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"onStop","file":"JobMaster.java","line":346,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":504,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":508,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"lambda$closeAsync$0","file":"JobManagerRunner.java","line":207,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"java.util.concurrent.CompletableFuture","method":"uniWhenComplete","file":"CompletableFuture.java","line":760,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture$UniWhenComplete","method":"tryFire","file":"CompletableFuture.java","line":736,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture","method":"postComplete","file":"CompletableFuture.java","line":474,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture","method":"completeExceptionally","file":"CompletableFuture.java","line":1977,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"postStop","file":"AkkaRpcActor.java","line":131,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor","method":"postStop","file":"FencedAkkaRpcActor.java","line":40,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"akka.actor.Actor$class","method":"aroundPostStop","file":"Actor.scala","line":515,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.UntypedActor","method":"aroundPostStop","file":"UntypedActor.scala","line":95,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.dungeon.FaultHandling$class","method":"akka$actor$dungeon$FaultHandling$$finishTerminate","file":"FaultHandling.scala","line":210,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.dungeon.FaultHandling$class","method":"terminate","file":"FaultHandling.scala","line":172,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"terminate","file":"ActorCell.scala","line":374,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"invokeAll$1","file":"ActorCell.scala","line":467,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"systemInvoke","file":"ActorCell.scala","line":483,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"}{noformat}
was:
While testing I found that the loss of connection with zookeeper triggers JVM
shutdown for Job Manager, when started through
"StandaloneSessionClusterEntrypoint". This happens due to a NPE on
"taskManagerHeartbeatManager."
When JobManagerRunner suspends jobMasterService (as Job manager is no longer
leader), taskManagerHeartbeatManager is set to null in "stopHeartbeatServices".
Next, "AkkaRpcActor" stops JobMaster and throws NPE in the following method:
{code:java}
@Override
public CompletableFuture<Acknowledge> disconnectTaskManager(final ResourceID
resourceID, final Exception cause) {
log.debug("Disconnect TaskExecutor {} because: {}", resourceID,
cause.getMessage());
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
slotPool.releaseTaskManager(resourceID, cause);
{code}
This leads to a fatal error finally in "ClusterEntryPoint.onFatalError()" and
forces JVM shutdown.
The stack trace is below:
{noformat}
{"timeMillis":1575581120723,"thread":"flink-akka.actor.default-dispatcher-93","level":"ERROR","loggerName":"com.Sample","message":"Failed
to take leadership with session id
b4662db5-f065-41d9-aaaf-78625355b251.","thrown":{"commonElementCount":0,"localizedMessage":"Failed
to take leadership with session id
b4662db5-f065-41d9-aaaf-78625355b251.","message":"Failed to take leadership
with session id
b4662db5-f065-41d9-aaaf-78625355b251.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":18,"localizedMessage":"Termination
of previous JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot
submit job under the same job id.","message":"Termination of previous
JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot submit job
under the same job
id.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":6,"localizedMessage":"org.apache.flink.util.FlinkException:
Could not properly shut down the
JobManagerRunner","message":"org.apache.flink.util.FlinkException: Could not
properly shut down the
JobManagerRunner","name":"java.util.concurrent.CompletionException","cause":{"commonElementCount":6,"localizedMessage":"Could
not properly shut down the JobManagerRunner","message":"Could not properly
shut down the
JobManagerRunner","name":"org.apache.flink.util.FlinkException","cause":{"commonElementCount":13,"localizedMessage":"Failure
while stopping RpcEndpoint jobmanager_0.","message":"Failure while stopping
RpcEndpoint
jobmanager_0.","name":"org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException","cause":{"commonElementCount":13,"name":"java.lang.NullPointerException","extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"disconnectTaskManager","file":"JobMaster.java","line":629,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"onStop","file":"JobMaster.java","line":346,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":504,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":508,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"lambda$closeAsync$0","file":"JobManagerRunner.java","line":207,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"java.util.concurrent.CompletableFuture","method":"uniWhenComplete","file":"CompletableFuture.java","line":760,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture$UniWhenComplete","method":"tryFire","file":"CompletableFuture.java","line":736,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture","method":"postComplete","file":"CompletableFuture.java","line":474,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture","method":"completeExceptionally","file":"CompletableFuture.java","line":1977,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"postStop","file":"AkkaRpcActor.java","line":131,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor","method":"postStop","file":"FencedAkkaRpcActor.java","line":40,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"akka.actor.Actor$class","method":"aroundPostStop","file":"Actor.scala","line":515,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.UntypedActor","method":"aroundPostStop","file":"UntypedActor.scala","line":95,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.dungeon.FaultHandling$class","method":"akka$actor$dungeon$FaultHandling$$finishTerminate","file":"FaultHandling.scala","line":210,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.dungeon.FaultHandling$class","method":"terminate","file":"FaultHandling.scala","line":172,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"terminate","file":"ActorCell.scala","line":374,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"invokeAll$1","file":"ActorCell.scala","line":467,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"systemInvoke","file":"ActorCell.scala","line":483,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"}{noformat}
> JobManager is forced to shutdown JVM due to temporary loss of zookeeper
> connection
> ----------------------------------------------------------------------------------
>
> Key: FLINK-15087
> URL: https://issues.apache.org/jira/browse/FLINK-15087
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.8.2
> Reporter: Abdul Qadeer
> Priority: Major
>
> While testing I found that the loss of connection with zookeeper triggers JVM
> shutdown for Job Manager, when started through
> "StandaloneSessionClusterEntrypoint". This happens due to a NPE on
> "taskManagerHeartbeatManager."
> When JobManagerRunner suspends jobMasterService (as Job manager is no longer
> leader), "taskManagerHeartbeatManager" is set to null in
> "stopHeartbeatServices".
> Next, "AkkaRpcActor" stops JobMaster and throws NPE in the following method:
> {code:java}
> @Override
> public CompletableFuture<Acknowledge> disconnectTaskManager(final ResourceID
> resourceID, final Exception cause) {
> log.debug("Disconnect TaskExecutor {} because: {}", resourceID,
> cause.getMessage());
> taskManagerHeartbeatManager.unmonitorTarget(resourceID);
> slotPool.releaseTaskManager(resourceID, cause);
> {code}
>
> This leads to a fatal error finally in "ClusterEntryPoint.onFatalError()" and
> forces JVM shutdown.
> The stack trace is below:
>
> {noformat}
> {"timeMillis":1575581120723,"thread":"flink-akka.actor.default-dispatcher-93","level":"ERROR","loggerName":"com.Sample","message":"Failed
> to take leadership with session id
> b4662db5-f065-41d9-aaaf-78625355b251.","thrown":{"commonElementCount":0,"localizedMessage":"Failed
> to take leadership with session id
> b4662db5-f065-41d9-aaaf-78625355b251.","message":"Failed to take leadership
> with session id
> b4662db5-f065-41d9-aaaf-78625355b251.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":18,"localizedMessage":"Termination
> of previous JobManager for job bbb8c430787d92293e9d45c349231d9c failed.
> Cannot submit job under the same job id.","message":"Termination of previous
> JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot submit job
> under the same job
> id.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":6,"localizedMessage":"org.apache.flink.util.FlinkException:
> Could not properly shut down the
> JobManagerRunner","message":"org.apache.flink.util.FlinkException: Could not
> properly shut down the
> JobManagerRunner","name":"java.util.concurrent.CompletionException","cause":{"commonElementCount":6,"localizedMessage":"Could
> not properly shut down the JobManagerRunner","message":"Could not properly
> shut down the
> JobManagerRunner","name":"org.apache.flink.util.FlinkException","cause":{"commonElementCount":13,"localizedMessage":"Failure
> while stopping RpcEndpoint jobmanager_0.","message":"Failure while stopping
> RpcEndpoint
> jobmanager_0.","name":"org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException","cause":{"commonElementCount":13,"name":"java.lang.NullPointerException","extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"disconnectTaskManager","file":"JobMaster.java","line":629,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"onStop","file":"JobMaster.java","line":346,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":504,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":508,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"lambda$closeAsync$0","file":"JobManagerRunner.java","line":207,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"java.util.concurrent.CompletableFuture","method":"uniWhenComplete","file":"CompletableFuture.java","line":760,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture$UniWhenComplete","method":"tryFire","file":"CompletableFuture.java","line":736,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture","method":"postComplete","file":"CompletableFuture.java","line":474,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"java.util.concurrent.CompletableFuture","method":"completeExceptionally","file":"CompletableFuture.java","line":1977,"exact":false,"location":"?","version":"1.8.0_66"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"postStop","file":"AkkaRpcActor.java","line":131,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor","method":"postStop","file":"FencedAkkaRpcActor.java","line":40,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"akka.actor.Actor$class","method":"aroundPostStop","file":"Actor.scala","line":515,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.UntypedActor","method":"aroundPostStop","file":"UntypedActor.scala","line":95,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.dungeon.FaultHandling$class","method":"akka$actor$dungeon$FaultHandling$$finishTerminate","file":"FaultHandling.scala","line":210,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.dungeon.FaultHandling$class","method":"terminate","file":"FaultHandling.scala","line":172,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"terminate","file":"ActorCell.scala","line":374,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"invokeAll$1","file":"ActorCell.scala","line":467,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.ActorCell","method":"systemInvoke","file":"ActorCell.scala","line":483,"exact":false,"location":"akka-actor_2.11-2.4.20.jar","version":"?"}{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)