[ 
https://issues.apache.org/jira/browse/FLINK-24147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411055#comment-17411055
 ] 

Matthias commented on FLINK-24147:
----------------------------------

Just for documentation purposes since the communication happened offline:
{quote}
What Flink version are you using?
-> Flink version -  1.9.2
What does GRHadoopOutputFormat.open do? This seems to be a custom 
implementation. There's still a question mark behind it being a Flink issue 
because I would have assumed that other's ran into this issue as well.
-> GRHadoopOutputFormat is a minor patch over HadoopOutputFormatBase, below is 
the diff in open() function
* HadoopOutputFormatBase
  this.configuration.set("mapreduce.output.basename", "tmp");
* GRHadoopOutputFormat
  this.configuration.set("mapreduce.output.basename", getFullFileName());

getFullFileName() function for most cases returns a string “partMapper-r-”
{quote}

> HDFS lease issues on Flink retry
> --------------------------------
>
>                 Key: FLINK-24147
>                 URL: https://issues.apache.org/jira/browse/FLINK-24147
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hadoop Compatibility
>    Affects Versions: 1.14.0, 1.12.5, 1.13.2
>            Reporter: Matthias
>            Priority: Major
>         Attachments: jobmanager.log
>
>
> This issue was brought up on the [ML thread "hdfs lease issues on flink 
> retry"|https://lists.apache.org/x/thread.html/r9e5dc9cbd0a41b88565bd6c8c1c9d864ffdd343b4a96bd4dd0dd8a97@%3Cuser.flink.apache.org%3E].
>  See attached  [^jobmanager.log] which was provided by the user:
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to CREATE_FILE 
> /user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet
>  for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file 
> lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on 
> 10.50.196.245
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1498)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1398)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
>         at com.sun.proxy.$Proxy19.create(Unknown Source)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
>         at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
>         at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
>         at com.sun.proxy.$Proxy20.create(Unknown Source)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476)
>         at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>         at 
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
>         at 
> com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> The user ran into {{FileAlreadyExistsException}} when it tried to create a 
> file for which a lease already existed. Admittedly, they use their own 
> implementation with {{GRHadoopOutputFormat}}. But [~dmvk] helped 
> investigating this as he was familiar with the error.
> We seem to have the problem in {{HadoopOutputFormatBase}} where we use a 
> fixed retry id {{0}} in 
> [HadoopOutputFormatBase:137|https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java#L137].
> Each resource in HDFS is allowed to have only one Writer accessing it. The 
> LeaseManager manages this through leases. It appears that we tried to access 
> the same file through another task due to {{HadoopOutputFormatBase}} 
> generating the same {{TaskAttemptId}}. The retry interval was shorter (in 
> that case 10 seconds) than Hadoop's hard-coded soft lease limit of 1min (see 
> [hadoop:HdfsConstants:62|https://github.com/apache/hadoop/blob/a9c1489e31e8f602de62bd3ecc517aa6597ab2f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java#L62]).
> We could be able to overcome this by adding a dynamic retry count instead of 
> {{_0}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to