[
https://issues.apache.org/jira/browse/FLINK-21250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278709#comment-17278709
]
Till Rohrmann edited comment on FLINK-21250 at 2/4/21, 9:38 AM:
----------------------------------------------------------------
Are you using Flink's HA services? Flink can only tolerate JobMaster faults if
you have configured one of Flink's HA services (ZooKeeper or Kubernetes). These
services are used to store required information like the last checkpoint id. If
you are using ZooKeeper, for example, then you could check the ZooKeeper logs
to see why the checkpoint id is not monotonically increasing.
You can find more information in the [HA
documentation|https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/].
was (Author: till.rohrmann):
Are you using Flink's HA services? Flink can only tolerate JobMaster faults if
you have configured one of Flink's HA services (ZooKeeper or Kubernetes). These
services are used to store required information like the last checkpoint id. If
you are using ZooKeeper, for example, then you could check the ZooKeeper logs
to see why the checkpoint id is not monotonically increasing.
> Failure to finalize checkpoint due to
> org.apache.hadoop.fs.FileAlreadyExistsException:
> /user/flink/checkpoints/9e36ed4ec2f6685f836e5ee5395f5f2e/chk-11096/_metadata
> for client xxx.xx.xx.xxx already exists
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21250
> URL: https://issues.apache.org/jira/browse/FLINK-21250
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.10.1
> Reporter: Fangliang Liu
> Priority: Major
> Attachments: image-2021-02-03-19-39-19-611.png,
> image-2021-02-04-11-33-09-549.png
>
>
> Flink Version :1.10.1
> The following exception will occasionally be thrown when the flink job is
> running on yarn. Checkpoint will always fail after the first exception is
> thrown.
> {code:java}
> level:WARNlocation:org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:796)log:2021-02-03
> 15:39:03,447 bigbase_kafka_to_hive_push_push_user_date_active WARN
> org.apache.flink.runtime.jobmaster.JobMaster - Error while
> processing checkpoint acknowledgement message
> message:Error while processing checkpoint acknowledgement
> messagethread:jobmanager-future-thread-34throwable:org.apache.flink.runtime.checkpoint.CheckpointException:
> Could not finalize the pending checkpoint 11096. Failure reason: Failure to
> finalize checkpoint.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:863)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:781)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:794)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException:
> /user/flink/checkpoints/140ba889671225dea822a4e1f569379a/chk-11096/_metadata
> for client 172.xx.xx.xxx already exists
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3021)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2908)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2792)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:615)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:117)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1841)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1698)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1633)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:891)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:788)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:274)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:854)
> ... 9 more
> Caused by: org.apache.hadoop.ipc.RemoteException:
> /user/flink/checkpoints/140ba889671225dea822a4e1f569379a/chk-11096/_metadata
> for client 172.xx.xx.xxx already exists
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3021)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2908)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2792)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:615)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:117)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272)
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1413)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy25.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
> at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy26.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1836)
> ... 25 more
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)