[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d13a05d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d13a05d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d13a05d

Branch: refs/heads/master
Commit: 3d13a05d1b2354e027626db280f9bfce9070e570
Parents: 8e76322
Author: Stephan Ewen <[email protected]>
Authored: Wed Nov 23 15:37:05 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Nov 28 14:20:08 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/CsvOutputFormat.java      |  1 +
 .../flink/api/java/io/CsvOutputFormatTest.java  | 47 +++++++++++++-------
 2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 703128f..c2fe13c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -165,6 +165,7 @@ public class CsvOutputFormat<T extends Tuple> extends 
FileOutputFormat<T> implem
        @Override
        public void close() throws IOException {
                if (wrt != null) {
+                       this.wrt.flush();
                        this.wrt.close();
                }
                super.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a9288c6..a8ce495 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -34,25 +35,30 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
 
+import static org.junit.Assert.fail;
+
 public class CsvOutputFormatTest {
 
        private String path = null;
-       private CsvOutputFormat<Tuple3<String, String, Integer>> 
csvOutputFormat;
 
        @Before
        public void createFile() throws Exception {
                path = 
File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
-               csvOutputFormat = new CsvOutputFormat<>(new Path(path));
        }
 
        @Test
        public void testNullAllow() throws Exception {
-               csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-               
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
-               csvOutputFormat.setAllowNullValues(true);
-               csvOutputFormat.open(0, 1);
-               csvOutputFormat.writeRecord(new Tuple3<String, String, 
Integer>("One", null, 8));
-               csvOutputFormat.close();
+               final CsvOutputFormat<Tuple3<String, String, Integer>> 
csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+               try {
+                       
csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+                       
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+                       csvOutputFormat.setAllowNullValues(true);
+                       csvOutputFormat.open(0, 1);
+                       csvOutputFormat.writeRecord(new Tuple3<String, String, 
Integer>("One", null, 8));
+               }
+               finally {
+                       csvOutputFormat.close();
+               }
 
                java.nio.file.Path p = Paths.get(path);
                Assert.assertTrue(Files.exists(p));
@@ -61,19 +67,28 @@ public class CsvOutputFormatTest {
                Assert.assertEquals("One,,8", lines.get(0));
        }
 
-       @Test(expected = RuntimeException.class)
+       @Test
        public void testNullDisallowOnDefault() throws Exception {
-               csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-               
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
-               csvOutputFormat.open(0, 1);
-               csvOutputFormat.writeRecord(new Tuple3<String, String, 
Integer>("One", null, 8));
-               csvOutputFormat.close();
+               final CsvOutputFormat<Tuple3<String, String, Integer>> 
csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+               try {
+                       
csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+                       
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+                       csvOutputFormat.open(0, 1);
+                       try {
+                               csvOutputFormat.writeRecord(new Tuple3<String, 
String, Integer>("One", null, 8));
+                               fail("should fail with an exception");
+                       } catch (RuntimeException e) {
+                               // expected
+                       }
+                       
+               }
+               finally {
+                       csvOutputFormat.close();
+               }
        }
 
        @After
        public void cleanUp() throws IOException {
-               csvOutputFormat.close();
                Files.deleteIfExists(Paths.get(path));
        }
-
 }

Reply via email to