[
https://issues.apache.org/jira/browse/FLINK-24147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias updated FLINK-24147:
-----------------------------
Description:
This issue was brought up on the [ML thread "hdfs lease issues on flink
retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
See attached [^jobmanager.log] which was provided by the user:
{code}
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to CREATE_FILE
/user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet
for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file
lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on
10.50.196.245
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
at org.apache.hadoop.ipc.Client.call(Client.java:1498)
at org.apache.hadoop.ipc.Client.call(Client.java:1398)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy19.create(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
at com.sun.proxy.$Proxy20.create(Unknown Source)
at
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647)
at
org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480)
at
org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at
com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
{code}
The user ran into {{FileAlreadyExistsException}} when it tried to create a file
for which a lease already existed. Admittedly, they use their own
implementation with {{GRHadoopOutputFormat}}. But [~dmvk] helped investigating
this as he was familiar with the error.
We seem to have the problem in {{HadoopOutputFormatBase}} where we use a fixed
retry id {{0}} in
[HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].
Each resource in HDFS is allowed to have only one Writer accessing it. The
LeaseManager manages this through leases. It appears that we tried to access
the same file through another task due to {{HadoopOutputFormatBase}} generating
the same {{TaskAttemptId}}. The retry interval was shorter (in that case 10
seconds) than Hadoop's hard-coded soft lease limit of 1min (see
[hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).
We could be able to overcome this by adding a dynamic retry count instead of
{{_0}}.
was:
This issue was brought up on the [ML thread "hdfs lease issues on flink
retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
See attached [^jobmanager.log] which was provided by the user.
The user ran into {{FileAlreadyExistsException}} when it tried to create a file
for which a lease already existed. [~dmvk] helped investigating this.
The problem seems to be that we use a fixed retry id {{0}} in
[HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].
Each resource in HDFS is allowed to have only one Writer accessing it. The
LeaseManager manages this through leases. It appears that we tried to access
the same file through another task due to {{HadoopOutputFormatBase}} generating
the same {{TaskAttemptId}}. The retry interval was shorter (in that case 10
seconds) than Hadoop's hard-coded soft lease limit of 1min (see
[hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).
We could be able to overcome this by adding a dynamic retry count instead of
{{_0}}.
> HDFS lease issues on Flink retry
> --------------------------------
>
> Key: FLINK-24147
> URL: https://issues.apache.org/jira/browse/FLINK-24147
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hadoop Compatibility
> Affects Versions: 1.14.0, 1.12.5, 1.13.2
> Reporter: Matthias
> Priority: Major
> Attachments: jobmanager.log
>
>
> This issue was brought up on the [ML thread "hdfs lease issues on flink
> retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
> See attached [^jobmanager.log] which was provided by the user:
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to CREATE_FILE
> /user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet
> for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file
> lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on
> 10.50.196.245
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
> at org.apache.hadoop.ipc.Client.call(Client.java:1498)
> at org.apache.hadoop.ipc.Client.call(Client.java:1398)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
> at com.sun.proxy.$Proxy19.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
> at com.sun.proxy.$Proxy20.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
> at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
> at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
> at
> com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90)
> at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> The user ran into {{FileAlreadyExistsException}} when it tried to create a
> file for which a lease already existed. Admittedly, they use their own
> implementation with {{GRHadoopOutputFormat}}. But [~dmvk] helped
> investigating this as he was familiar with the error.
> We seem to have the problem in {{HadoopOutputFormatBase}} where we use a
> fixed retry id {{0}} in
> [HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].
> Each resource in HDFS is allowed to have only one Writer accessing it. The
> LeaseManager manages this through leases. It appears that we tried to access
> the same file through another task due to {{HadoopOutputFormatBase}}
> generating the same {{TaskAttemptId}}. The retry interval was shorter (in
> that case 10 seconds) than Hadoop's hard-coded soft lease limit of 1min (see
> [hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).
> We could be able to overcome this by adding a dynamic retry count instead of
> {{_0}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)