[
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211609#comment-16211609
]
Piotr Nowojski commented on FLINK-7737:
---------------------------------------
I have started looking into this. Are you sure that setting {{SYNC_BLOCK}} is
good enough? According to what I was able to find, it only tells to sync the
data on the last packet for given block and seems to have nothing to do with
{{hflush()}}. Unless I'm missing the point that we start a new data block per
each flush?
Regardless of that, I think that we cannot always {{hsync()}} the data before
of performance reasons (different semantics on different systems), so I would
lean toward the 4. option proposed by [~StephanEwen]. However the abstraction
with {{FileSystemFactory}} is a little bit confusing to me, because it
works/allows to plugin {{FileSystem}} defined in {{flink-core}} module, but
{{BucketingSink}} and {{StreamWriterBase}} are using {{FileSystem}} and
{{FSDataOutputStream}} defined in {{hadoop-commons}}, which confuses me...
[~ryanehobbs] am I missing something, or is this PR
https://github.com/apache/flink/pull/4781 kind of irrelevant to this issue?
> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.3.2
> Environment: Dev
> Reporter: Ryan Hobbs
> Assignee: Piotr Nowojski
> Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed
> that on HCFS systems where the data stream is of type FSDataOutputStream,
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the
> output stream is of type *HdfsDataOutputStream* but not for streams of type
> *FSDataOutputStream*. Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> }
> } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
> }
> throw new RuntimeException(msg, e);
> } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
> }
> }
> {code}
> Could a potential fix me to perform a sync even on streams of type
> *FSDataOutputStream*?
> {code}
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)