[ 
https://issues.apache.org/jira/browse/FLINK-4725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683703#comment-15683703
 ] 

Kostas Kloudas commented on FLINK-4725:
---------------------------------------

I think this issue is resolved already in the BucketingSink and it will be in 
the RollingSink with an upcoming Pull Request.
Now both of these sinks will not cleanup upon restoring from a failure. 

You can also check the discussion here: 
https://issues.apache.org/jira/browse/FLINK-5083 and in the PR referenced in 
that JIRA.

> BucketingSink throws NPE while restoring state if basePath does not exist
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4725
>                 URL: https://issues.apache.org/jira/browse/FLINK-4725
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.2.0, 1.1.2
>            Reporter: Jordan Ganoff
>            Priority: Blocker
>
> BucketingSink throws a NullPointerException when attempting to clean up 
> pending files if the basePath does not exist.
> The culprit is a [call to org.apache.hadoop.fs.FileSystem.listFiles() on line 
> 784|https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L784].
> I added this longer description from the duplicate FLINK-4779 (aljoscha):
> When I restore from a savepoint, starting the job fails when the root-folder 
> used by the BucketingSink not yet exists. This may happen in my case, when 
> the source for my sink has not yet emitted any messages and I did not create 
> the folder by hand.
> The complete folder structure is not required by the BucketingSink as it will 
> create itermediate folders by itself when creating the bucket.
> I suggest that this does not prevent the job from being restarted.
> {code}
> 10/07/2016 22:50:53     Source: Kafka Consumer for X -> (Sink: HDFS for X, 
> Sink: X)(1/1) switched to FAILED
> java.lang.Exception: Failed to restore state to function: Error while 
> deleting old pending files.
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:184)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:550)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:255)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while deleting old pending files.
>         at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:805)
>         at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:139)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:182)
>         ... 4 more
> Caused by: java.io.FileNotFoundException: File 
> hdfs://server:8020/1/2/3/4/flink does not exist.
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:948)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:927)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:872)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:868)
>         at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:886)
>         at 
> org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1696)
>         at org.apache.hadoop.fs.FileSystem$6.<init>(FileSystem.java:1791)
>         at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1787)
>         at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:784)
>         ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to