[ 
https://issues.apache.org/jira/browse/FLINK-24147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias updated FLINK-24147:
-----------------------------
    Description: 
This issue was brought up on the [ML thread "hdfs lease issues on flink 
retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
 See attached  [^jobmanager.log] which was provided by the user:
{code}
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
 Failed to CREATE_FILE 
/user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet
 for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file 
lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on 
10.50.196.245
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
        at org.apache.hadoop.ipc.Client.call(Client.java:1498)
        at org.apache.hadoop.ipc.Client.call(Client.java:1398)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
        at com.sun.proxy.$Proxy19.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
        at com.sun.proxy.$Proxy20.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
        at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
        at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
        at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
        at 
com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90)
        at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:745)
{code}
The user ran into {{FileAlreadyExistsException}} when it tried to create a file 
for which a lease already existed. Admittedly, they use their own 
implementation with {{GRHadoopOutputFormat}}. But [~dmvk] helped investigating 
this as he was familiar with the error.

We seem to have the problem in {{HadoopOutputFormatBase}} where we use a fixed 
retry id {{0}} in 
[HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].

Each resource in HDFS is allowed to have only one Writer accessing it. The 
LeaseManager manages this through leases. It appears that we tried to access 
the same file through another task due to {{HadoopOutputFormatBase}} generating 
the same {{TaskAttemptId}}. The retry interval was shorter (in that case 10 
seconds) than Hadoop's hard-coded soft lease limit of 1min (see 
[hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).

We could be able to overcome this by adding a dynamic retry count instead of 
{{_0}}.

  was:
This issue was brought up on the [ML thread "hdfs lease issues on flink 
retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
 See attached  [^jobmanager.log] which was provided by the user.

The user ran into {{FileAlreadyExistsException}} when it tried to create a file 
for which a lease already existed. [~dmvk] helped investigating this.

The problem seems to be that we use a fixed retry id {{0}} in 
[HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].

Each resource in HDFS is allowed to have only one Writer accessing it. The 
LeaseManager manages this through leases. It appears that we tried to access 
the same file through another task due to {{HadoopOutputFormatBase}} generating 
the same {{TaskAttemptId}}. The retry interval was shorter (in that case 10 
seconds) than Hadoop's hard-coded soft lease limit of 1min (see 
[hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).

We could be able to overcome this by adding a dynamic retry count instead of 
{{_0}}.


> HDFS lease issues on Flink retry
> --------------------------------
>
>                 Key: FLINK-24147
>                 URL: https://issues.apache.org/jira/browse/FLINK-24147
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hadoop Compatibility
>    Affects Versions: 1.14.0, 1.12.5, 1.13.2
>            Reporter: Matthias
>            Priority: Major
>         Attachments: jobmanager.log
>
>
> This issue was brought up on the [ML thread "hdfs lease issues on flink 
> retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
>  See attached  [^jobmanager.log] which was provided by the user:
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to CREATE_FILE 
> /user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet
>  for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file 
> lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on 
> 10.50.196.245
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1498)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1398)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
>         at com.sun.proxy.$Proxy19.create(Unknown Source)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
>         at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
>         at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
>         at com.sun.proxy.$Proxy20.create(Unknown Source)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476)
>         at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>         at 
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
>         at 
> com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> The user ran into {{FileAlreadyExistsException}} when it tried to create a 
> file for which a lease already existed. Admittedly, they use their own 
> implementation with {{GRHadoopOutputFormat}}. But [~dmvk] helped 
> investigating this as he was familiar with the error.
> We seem to have the problem in {{HadoopOutputFormatBase}} where we use a 
> fixed retry id {{0}} in 
> [HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].
> Each resource in HDFS is allowed to have only one Writer accessing it. The 
> LeaseManager manages this through leases. It appears that we tried to access 
> the same file through another task due to {{HadoopOutputFormatBase}} 
> generating the same {{TaskAttemptId}}. The retry interval was shorter (in 
> that case 10 seconds) than Hadoop's hard-coded soft lease limit of 1min (see 
> [hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).
> We could be able to overcome this by adding a dynamic retry count instead of 
> {{_0}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to