[
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199070#comment-16199070
]
Ryan Hobbs commented on FLINK-7737:
-----------------------------------
In the case of hflush() it is simply flushing the buffer but there is no
guarantee that the data is sync'd to disk so in the case of a failure scenario
we have seen data loss when hflush() is used. Is it possible for Flink to pass
in SYNC_BLOCK flag on create(). If set I believe when hflush() is called it
will perform hsync().
> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.3.2
> Environment: Dev
> Reporter: Ryan Hobbs
>
> During several tests where we simulated failure conditions, we have observed
> that on HCFS systems where the data stream is of type FSDataOutputStream,
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the
> output stream is of type *HdfsDataOutputStream* but not for streams of type
> *FSDataOutputStream*. Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> }
> } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
> }
> throw new RuntimeException(msg, e);
> } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
> }
> }
> {code}
> Could a potential fix me to perform a sync even on streams of type
> *FSDataOutputStream*?
> {code}
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)