[
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205136#comment-16205136
]
Stephan Ewen commented on FLINK-7737:
-------------------------------------
Okay, I think the root cause here is that {{hflush()}} and {{hsync()}}
semantics are somewhat different across file systems.
We cud solve this the following way:
1. Always {{hsync()}} in the bucketing sink, but I feat that introduces a
performance hit for other cases (like HDFS).
2. Always passing SYNC_BLOCK in create - will that have the same effect as
(1) ?
3. Make it configurable in the for the user whether they need sync or only
flush. But I fear most users will get that wrong.
4. Make the file systems obey a stricter definition of {{flush()}}, meaning
it needs to guarantee persistence for loss of a TaskManager. Then this is up to
the file system implementer or the wrapper to forward these calls properly.
BTW, it has just gotten a lot easier to plug in new file systems. A file system
based on Hadoop can also be explicitly exposed to handle certain situations
differently than the other file systems loaded through Hadoop's abstaction:
https://github.com/apache/flink/pull/4781 (already merged into master)
I believe that this is an issue for more cases. For example, in order to
guarantee recoverability of a task manager, the file system mounted under
{{file://}} needs only {{flush()}} if it is a local file system, but needs to
{{sync()}} if it is a mounted NFS style file system.
> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.3.2
> Environment: Dev
> Reporter: Ryan Hobbs
> Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed
> that on HCFS systems where the data stream is of type FSDataOutputStream,
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the
> output stream is of type *HdfsDataOutputStream* but not for streams of type
> *FSDataOutputStream*. Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> }
> } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
> }
> throw new RuntimeException(msg, e);
> } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
> }
> }
> {code}
> Could a potential fix me to perform a sync even on streams of type
> *FSDataOutputStream*?
> {code}
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)