[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146770#comment-17146770 ]
JIAN WANG edited comment on FLINK-13874 at 6/27/20, 3:50 AM: ------------------------------------------------------------- Hi [~gyfora] , I still meet the same issue on flink-1.10.1. I use flink on YARN(3.0.0-cdh6.3.2) with StreamingFileSink. code part like this: *public* *static* <IN> StreamingFileSink<IN> build(String dir, BucketAssigner<IN, String> assigner, String prefix){ return StreamingFileSink.forRowFormat(new Path(dir), new SimpleStringEncoder<IN>()) .withRollingPolicy(DefaultRollingPolicy._builder_() .withRolloverInterval(TimeUnit.HOURS.toMillis(2)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(10)) .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB .build()) .withBucketAssigner(assigner) .withOutputFileConfig(OutputFileConfig._builder_().withPartPrefix(prefix).build()) .build(); } The error is java.io.IOException: Problem while truncating file: hdfs:///business_log/hashtag/2020-06-25/.hashtag-122-37.inprogress.8e65f69c-b5ba-4466-a844-ccc0a5a93de2 Due to this issue, it can not restart from the latest checkpoint and savepoint. was (Author: alvinwj): Hi [~gyfora] , I still meet the same issue on flink-1.10.1. I use flink on YARN(3.0.0-cdh6.3.2) with StreamingFileSink. code part like this: ``` *public* *static* <IN> StreamingFileSink<IN> build(String dir, BucketAssigner<IN, String> assigner, String prefix) { *return* StreamingFileSink ._forRowFormat_(*new* Path(dir), *new* SimpleStringEncoder<IN>()) .withRollingPolicy(DefaultRollingPolicy._builder_() .withRolloverInterval(TimeUnit.*_HOURS_*.toMillis(2)) .withInactivityInterval(TimeUnit.*_MINUTES_*.toMillis(10)) .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB .build()) .withBucketAssigner(assigner) .withOutputFileConfig(OutputFileConfig._builder_().withPartPrefix(prefix).build()) .build(); } ``` The error is ``` java.io.IOException: Problem while truncating file: hdfs:///business_log/hashtag/2020-06-25/.hashtag-122-37.inprogress.8e65f69c-b5ba-4466-a844-ccc0a5a93de2 ``` Due to this issue, it can not restart from the latest checkpoint and savepoint. > StreamingFileSink fails to recover (truncate) properly > ------------------------------------------------------ > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.9.0 > Reporter: Gyula Fora > Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499) > at org.apache.hadoop.ipc.Client.call(Client.java:1445) > at org.apache.hadoop.ipc.Client.call(Client.java:1355) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy17.truncate(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366) > at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy18.truncate(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869) > at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:197) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:164){noformat} > Seems like even though we waited for the lease before starting the truncate, > it's still not ours -- This message was sent by Atlassian Jira (v8.3.4#803005)