[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d13a05d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d13a05d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d13a05d Branch: refs/heads/master Commit: 3d13a05d1b2354e027626db280f9bfce9070e570 Parents: 8e76322 Author: Stephan Ewen <[email protected]> Authored: Wed Nov 23 15:37:05 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Nov 28 14:20:08 2016 +0100 ---------------------------------------------------------------------- .../flink/api/java/io/CsvOutputFormat.java | 1 + .../flink/api/java/io/CsvOutputFormatTest.java | 47 +++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java index 703128f..c2fe13c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java @@ -165,6 +165,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem @Override public void close() throws IOException { if (wrt != null) { + this.wrt.flush(); this.wrt.close(); } super.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java index a9288c6..a8ce495 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,25 +35,30 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import static org.junit.Assert.fail; + public class CsvOutputFormatTest { private String path = null; - private CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat; @Before public void createFile() throws Exception { path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath(); - csvOutputFormat = new CsvOutputFormat<>(new Path(path)); } @Test public void testNullAllow() throws Exception { - csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); - csvOutputFormat.setAllowNullValues(true); - csvOutputFormat.open(0, 1); - csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8)); - csvOutputFormat.close(); + final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path)); + try { + csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); + csvOutputFormat.setAllowNullValues(true); + csvOutputFormat.open(0, 1); + csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8)); + } + finally { + csvOutputFormat.close(); + } java.nio.file.Path p = Paths.get(path); Assert.assertTrue(Files.exists(p)); @@ -61,19 +67,28 @@ public class CsvOutputFormatTest { Assert.assertEquals("One,,8", lines.get(0)); } - @Test(expected = RuntimeException.class) + @Test public void testNullDisallowOnDefault() throws Exception { - csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); - csvOutputFormat.open(0, 1); - csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8)); - csvOutputFormat.close(); + final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path)); + try { + csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); + csvOutputFormat.open(0, 1); + try { + csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8)); + fail("should fail with an exception"); + } catch (RuntimeException e) { + // expected + } + + } + finally { + csvOutputFormat.close(); + } } @After public void cleanUp() throws IOException { - csvOutputFormat.close(); Files.deleteIfExists(Paths.get(path)); } - }
