[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212857#comment-16212857
 ] 

Piotr Nowojski edited comment on FLINK-7737 at 10/20/17 8:10 PM:
-----------------------------------------------------------------

{{StreamWriterBase#close}} issues a {{flush()}} call, so {{syncOnFlush}} flag 
will be honoured also when closing. What "create()" do you have in mind?


was (Author: pnowojski):
What do you mean? {{StreamWriterBase#close}} issues a {{flush()}} call, so 
{{syncOnFlush}} flag will be honoured also when closing. What "create()" do you 
have in mind?

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7737
>                 URL: https://issues.apache.org/jira/browse/FLINK-7737
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>         Environment: Dev
>            Reporter: Ryan Hobbs
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>                               ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                       }
>               } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>                       }
> throw new RuntimeException(msg, e);
>               } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>               }
>       }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
>                                 ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                         } else if (os instanceof FSDataOutputStream) {
>                                 os.hsync();
>                         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to