[
https://issues.apache.org/jira/browse/FLINK-24147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409444#comment-17409444
]
Timo Walther commented on FLINK-24147:
--------------------------------------
[~mapohl] sorry, I don't think I can contribute valuable feedback here.
> HDFS lease issues on Flink retry
> --------------------------------
>
> Key: FLINK-24147
> URL: https://issues.apache.org/jira/browse/FLINK-24147
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hadoop Compatibility
> Affects Versions: 1.14.0, 1.12.5, 1.13.2
> Reporter: Matthias
> Priority: Major
> Attachments: jobmanager.log
>
>
> This issue was brought up on the [ML thread "hdfs lease issues on flink
> retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
> See attached [^jobmanager.log] which was provided by the user:
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to CREATE_FILE
> /user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet
> for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file
> lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on
> 10.50.196.245
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
> at org.apache.hadoop.ipc.Client.call(Client.java:1498)
> at org.apache.hadoop.ipc.Client.call(Client.java:1398)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
> at com.sun.proxy.$Proxy19.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
> at com.sun.proxy.$Proxy20.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
> at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
> at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
> at
> com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90)
> at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> The user ran into {{FileAlreadyExistsException}} when it tried to create a
> file for which a lease already existed. Admittedly, they use their own
> implementation with {{GRHadoopOutputFormat}}. But [~dmvk] helped
> investigating this as he was familiar with the error.
> We seem to have the problem in {{HadoopOutputFormatBase}} where we use a
> fixed retry id {{0}} in
> [HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].
> Each resource in HDFS is allowed to have only one Writer accessing it. The
> LeaseManager manages this through leases. It appears that we tried to access
> the same file through another task due to {{HadoopOutputFormatBase}}
> generating the same {{TaskAttemptId}}. The retry interval was shorter (in
> that case 10 seconds) than Hadoop's hard-coded soft lease limit of 1min (see
> [hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).
> We could be able to overcome this by adding a dynamic retry count instead of
> {{_0}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)