[
https://issues.apache.org/jira/browse/FLINK-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen updated FLINK-9751:
--------------------------------
Component/s: Streaming Connectors
> Add a RecoverableWriter to the FileSystem abstraction
> -----------------------------------------------------
>
> Key: FLINK-9751
> URL: https://issues.apache.org/jira/browse/FLINK-9751
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming Connectors
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Major
> Fix For: 1.6.0
>
>
> The core operation of the StreamingFileSink is to append result data to
> (hidden) "in progress" files and then, when the files should roll over,
> publish them as visible files. At each checkpoint, the data so far must be
> persistent in the "in progress" files. On recovery, we resume the "in
> progress" file at the exact position of the checkpoint, or publish up to the
> position of that checkpoint.
> In order to support various file systems and object stores, we need an
> interface that captures these core operations and allows for different
> implementations (such as file truncate/append on posix, MultiPartUpload on
> S3, ...)
> Proposed interface:
> {code:java}
> /**
> * A handle to an in-progress stream with a defined and persistent amount of
> data.
> * The handle can be used to recover the stream and publish the result file.
> */
> interface CommitRecoverable { ... }
> /**
> * A handle to an in-progress stream with a defined and persistent amount of
> data.
> * The handle can be used to recover the stream and either publish the result
> file
> * or keep appending data to the stream.
> */
> interface ResumeRecoverable extends CommitRecoverable { ... }
> /**
> * An output stream to a file system that can be recovered at well defined
> points.
> * The stream initially writes to hidden files or temp files and only creates
> the
> * target file once it is closed and "committed".
> */
> public abstract class RecoverableFsDataOutputStream extends
> FSDataOutputStream {
> /**
> * Ensures all data so far is persistent (similar to {@link #sync()}) and
> returns
> * a handle to recover the stream at the current position.
> */
> public abstract ResumeRecoverable persist() throws IOException;
> /**
> * Closes the stream, ensuring persistence of all data (similar to {@link
> #sync()}).
> * This returns a Committer that can be used to publish (make visible)
> the file
> * that the stream was writing to.
> */
> public abstract Committer closeForCommit() throws IOException;
> /**
> * A committer can publish the file of a stream that was closed.
> * The Committer can be recovered via a {@link CommitRecoverable}.
> */
> public interface Committer {
> void commit() throws IOException;
> CommitRecoverable getRecoverable();
> }
> }
> /**
> * The RecoverableWriter creates and recovers RecoverableFsDataOutputStream.
> */
> public interface RecoverableWriter{
> RecoverableFsDataOutputStream open(Path path) throws IOException;
> RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws
> IOException;
> RecoverableFsDataOutputStream.Committer
> recoverForCommit(CommitRecoverable resumable) throws IOException;
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)