Repository: flink
Updated Branches:
  refs/heads/master 6181302f1 -> 6d0c4c340


[FLINK-6427] Ensure file length is flushed in StreamWriterBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0c4c34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0c4c34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0c4c34

Branch: refs/heads/master
Commit: 6d0c4c340d0b052d2a97e7e86622707d05f6b6d7
Parents: 6181302
Author: Jürgen Thomann <[email protected]>
Authored: Wed May 3 13:06:04 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed May 3 13:46:10 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/connectors/fs/StreamWriterBase.java | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d0c4c34/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index 140246f..a04e4b5 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -21,12 +21,14 @@ import 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.EnumSet;
 
 /**
  * Base class for {@link Writer Writers} that write to a {@link 
FSDataOutputStream}.
@@ -70,6 +72,10 @@ public abstract class StreamWriterBase<T> implements 
Writer<T> {
                        // At this point the refHflushOrSync cannot be null,
                        // since register method would have thrown if it was.
                        this.refHflushOrSync.invoke(os);
+
+                       if (os instanceof HdfsDataOutputStream) {
+                               ((HdfsDataOutputStream) 
os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+                       }
                } catch (InvocationTargetException e) {
                        String msg = "Error while trying to hflushOrSync!";
                        LOG.error(msg + " " + e.getCause());

Reply via email to