[
https://issues.apache.org/jira/browse/FLINK-6427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992671#comment-15992671
]
ASF GitHub Bot commented on FLINK-6427:
---------------------------------------
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/3807
[FLINK-6427] Ensure file length is flushed in StreamWriterBase
StreamWriterBase is used with BucketingSink. With HDFS, it can happen that
the
NameNode does not update the file length if we flush but don't properly
close
the file. We now specify that we also want to flush the file length when
flushing.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink
bucketing-sink-flush-file-length
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3807.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3807
----
commit dbc3899d5dbda642fdce0f06e9fece43dbc9585f
Author: Aljoscha Krettek <[email protected]>
Date: 2017-05-02T09:31:40Z
[FLINK-6427] Ensure file length is flushed in StreamWriterBase
StreamWriterBase is used with BucketingSink. With HDFS, it can happen that
the
NameNode does not update the file length if we flush but don't properly
close
the file. We now specify that we also want to flush the file length when
flushing.
----
> BucketingSink does not sync file length in case of cancel
> ---------------------------------------------------------
>
> Key: FLINK-6427
> URL: https://issues.apache.org/jira/browse/FLINK-6427
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.3.0, 1.2.1
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This is from a discussion on the user mailing lists:
> https://lists.apache.org/thread.html/917dbbbd7c8f48b33e7b470fa9ed382df61cf43cb5deb46becebf4b5@%3Cuser.flink.apache.org%3E
> A possible solution is to add this to {{StreamWriterBase.hflushOrSync()}}:
> {code}
> if (os.getWrappedStream() instanceof DFSOutputStream) {
> ((DFSOutputStream)
> os.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)