Repository: flink Updated Branches: refs/heads/master 6181302f1 -> 6d0c4c340
[FLINK-6427] Ensure file length is flushed in StreamWriterBase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0c4c34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0c4c34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0c4c34 Branch: refs/heads/master Commit: 6d0c4c340d0b052d2a97e7e86622707d05f6b6d7 Parents: 6181302 Author: Jürgen Thomann <[email protected]> Authored: Wed May 3 13:06:04 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed May 3 13:46:10 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/streaming/connectors/fs/StreamWriterBase.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6d0c4c34/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java index 140246f..a04e4b5 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -21,12 +21,14 @@ import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.EnumSet; /** * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. @@ -70,6 +72,10 @@ public abstract class StreamWriterBase<T> implements Writer<T> { // At this point the refHflushOrSync cannot be null, // since register method would have thrown if it was. this.refHflushOrSync.invoke(os); + + if (os instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } } catch (InvocationTargetException e) { String msg = "Error while trying to hflushOrSync!"; LOG.error(msg + " " + e.getCause());
