[
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221844#comment-16221844
]
ASF GitHub Bot commented on FLINK-7737:
---------------------------------------
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/4876
I assumed that it is up to concrete implementations of `StreamWriterBase`
to implement `equals` completely, for example based on `configuration`.
Alternatively including `outStream` in equality/hash implementation is also a
solution, with slightly different semantic.
> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.3.2
> Environment: Dev
> Reporter: Ryan Hobbs
> Assignee: Piotr Nowojski
> Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed
> that on HCFS systems where the data stream is of type FSDataOutputStream,
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the
> output stream is of type *HdfsDataOutputStream* but not for streams of type
> *FSDataOutputStream*. Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> }
> } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
> }
> throw new RuntimeException(msg, e);
> } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
> }
> }
> {code}
> Could a potential fix me to perform a sync even on streams of type
> *FSDataOutputStream*?
> {code}
> if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream)
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)