Repository: flink Updated Branches: refs/heads/release-0.8 cd96caf36 -> 00117f7ff
[FLINK-1389] Allow changing the filenames of the files created when writing to a directory This closes #301 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00117f7f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00117f7f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00117f7f Branch: refs/heads/release-0.8 Commit: 00117f7ffc3e2bba30b5eb9aff25745d935775b4 Parents: cd96caf Author: Robert Metzger <rmetz...@apache.org> Authored: Mon Jan 12 16:40:34 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Jan 26 11:30:32 2015 +0100 ---------------------------------------------------------------------- .../flink/api/java/io/AvroOutputFormat.java | 7 +++- .../flink/api/avro/AvroOutputFormatTest.java | 5 +++ flink-addons/flink-tachyon/pom.xml | 6 +++ .../java/org/apache/flink/tachyon/HDFSTest.java | 41 +++++++++++++++++++ .../flink/api/common/io/FileOutputFormat.java | 18 +++++--- .../api/common/io/FileOutputFormatTest.java | 43 +++++++++++++++++++- 6 files changed, 111 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java index 08fe8d9..d00dbf7 100644 --- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java +++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java @@ -41,7 +41,6 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> { private transient DataFileWriter<E> dataFileWriter; - public AvroOutputFormat(Path filePath, Class<E> type) { super(filePath); this.avroValueType = type; @@ -51,6 +50,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> { this.avroValueType = type; } + @Override + protected String getDirectoryFileName(int taskNumber) { + return super.getDirectoryFileName(taskNumber) + ".avro"; + } + public void setSchema(Schema schema) { this.userDefinedSchema = schema; } @@ -63,6 +67,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> { @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); + DatumWriter<E> datumWriter; Schema schema = null; if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java index db235c0..42c1702 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java @@ -84,12 +84,17 @@ public class AvroOutputFormatTest extends JavaProgramTestBase { File file1 = asFile(outputPath1); if (file1.isDirectory()) { output1 = file1.listFiles(); + // check for avro ext in dir. + for (File avroOutput : output1) { + Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); + } } else { output1 = new File[] {file1}; } List<String> result1 = new ArrayList<String>(); DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); for (File avroOutput : output1) { + DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); while (dataFileReader1.hasNext()) { User user = dataFileReader1.next(); http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-tachyon/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-tachyon/pom.xml b/flink-addons/flink-tachyon/pom.xml index 74640be..4ce31aa 100644 --- a/flink-addons/flink-tachyon/pom.xml +++ b/flink-addons/flink-tachyon/pom.xml @@ -49,6 +49,12 @@ under the License. <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.tachyonproject</groupId> <artifactId>tachyon</artifactId> <version>0.5.0</version> http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java index 8d29ea7..7318894 100644 --- a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java +++ b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java @@ -18,13 +18,19 @@ package org.apache.flink.tachyon; + import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.io.FileOutputFormat; +import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -116,4 +122,39 @@ public class HDFSTest { Assert.fail("Error in test: " + e.getMessage() ); } } + + @Test + public void testAvroOut() { + String type = "one"; + AvroOutputFormat<String> avroOut = + new AvroOutputFormat<String>( String.class ); + + org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest"); + + avroOut.setOutputFilePath(new Path(result.toString())); + avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE); + avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS); + + try { + avroOut.open(0, 2); + avroOut.writeRecord(type); + avroOut.close(); + + avroOut.open(1, 2); + avroOut.writeRecord(type); + avroOut.close(); + + + Assert.assertTrue("No result file present", hdfs.exists(result)); + FileStatus[] files = hdfs.listStatus(result); + Assert.assertEquals(2, files.length); + for(FileStatus file : files) { + Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); + } + + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index bc7ab73..924de7e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -89,7 +89,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali * The key under which the name of the target path is stored in the configuration. */ public static final String FILE_PARAMETER_KEY = "flink.output.file"; - + /** * The path of the file to be written. */ @@ -104,7 +104,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali * The output directory mode */ private OutputDirectoryMode outputDirectoryMode; - + // -------------------------------------------------------------------------------------------- /** The stream to which the data is written; */ @@ -161,7 +161,8 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali public OutputDirectoryMode getOutputDirectoryMode() { return this.outputDirectoryMode; } - + + // ---------------------------------------------------------------- @Override @@ -233,10 +234,11 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali } } } - - + + + // Suffix the path with the parallel instance index, if needed - this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p; + this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p; // create output file this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE); @@ -245,6 +247,10 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali this.fileCreated = true; } + protected String getDirectoryFileName(int taskNumber) { + return Integer.toString(taskNumber + 1); + } + @Override public void close() throws IOException { final FSDataOutputStream s = this.stream; http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java index 6cdf731..43ded56 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java @@ -144,8 +144,38 @@ public class FileOutputFormatTest { Assert.assertTrue(!exception); Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory()); Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile()); - + + // check custom file name inside directory if directory exists + (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete(); + dfof = new DummyFileOutputFormat(); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.NO_OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + dfof.testFileName = true; + Configuration c = new Configuration(); + dfof.configure(c); + + exception = false; + try { + dfof.open(0, 1); + dfof.close(); + } catch (Exception e) { + exception = true; + } + File customOutFile = new File(tmpOutPath.getAbsolutePath()+"/fancy-1-0.avro"); + Assert.assertTrue(!exception); + Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory()); + Assert.assertTrue(customOutFile.exists() && customOutFile.isFile()); + customOutFile.delete(); + // check fail if file in directory exists + // create file for test + customOutFile = new File(tmpOutPath.getAbsolutePath()+"/1"); + try { + customOutFile.createNewFile(); + } catch (IOException e) { + Assert.fail("Error creating file"); + } dfof = new DummyFileOutputFormat(); dfof.setOutputFilePath(new Path(tmpFilePath)); dfof.setWriteMode(WriteMode.NO_OVERWRITE); @@ -562,11 +592,20 @@ public class FileOutputFormatTest { public static class DummyFileOutputFormat extends FileOutputFormat<IntValue> { private static final long serialVersionUID = 1L; - + public boolean testFileName = false; @Override public void writeRecord(IntValue record) throws IOException { // DO NOTHING } + + @Override + protected String getDirectoryFileName(int taskNumber) { + if(testFileName) { + return "fancy-"+(taskNumber+1)+"-"+taskNumber+".avro"; + } else { + return super.getDirectoryFileName(taskNumber); + } + } } }