Repository: flink
Updated Branches:
  refs/heads/release-0.8 cd96caf36 -> 00117f7ff


[FLINK-1389] Allow changing the filenames of the files created when writing to 
a directory

This closes #301


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00117f7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00117f7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00117f7f

Branch: refs/heads/release-0.8
Commit: 00117f7ffc3e2bba30b5eb9aff25745d935775b4
Parents: cd96caf
Author: Robert Metzger <rmetz...@apache.org>
Authored: Mon Jan 12 16:40:34 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Jan 26 11:30:32 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |  7 +++-
 .../flink/api/avro/AvroOutputFormatTest.java    |  5 +++
 flink-addons/flink-tachyon/pom.xml              |  6 +++
 .../java/org/apache/flink/tachyon/HDFSTest.java | 41 +++++++++++++++++++
 .../flink/api/common/io/FileOutputFormat.java   | 18 +++++---
 .../api/common/io/FileOutputFormatTest.java     | 43 +++++++++++++++++++-
 6 files changed, 111 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
 
b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 08fe8d9..d00dbf7 100644
--- 
a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ 
b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -41,7 +41,6 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> {
 
        private transient DataFileWriter<E> dataFileWriter;
 
-
        public AvroOutputFormat(Path filePath, Class<E> type) {
                super(filePath);
                this.avroValueType = type;
@@ -51,6 +50,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> 
{
                this.avroValueType = type;
        }
 
+       @Override
+       protected String getDirectoryFileName(int taskNumber) {
+               return super.getDirectoryFileName(taskNumber) + ".avro";
+       }
+
        public void setSchema(Schema schema) {
                this.userDefinedSchema = schema;
        }
@@ -63,6 +67,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> {
        @Override
        public void open(int taskNumber, int numTasks) throws IOException {
                super.open(taskNumber, numTasks);
+
                DatumWriter<E> datumWriter;
                Schema schema = null;
                if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {

http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index db235c0..42c1702 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -84,12 +84,17 @@ public class AvroOutputFormatTest extends 
JavaProgramTestBase {
                File file1 = asFile(outputPath1);
                if (file1.isDirectory()) {
                        output1 = file1.listFiles();
+                       // check for avro ext in dir.
+                       for (File avroOutput : output1) {
+                               Assert.assertTrue("Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
+                       }
                } else {
                        output1 = new File[] {file1};
                }
                List<String> result1 = new ArrayList<String>();
                DatumReader<User> userDatumReader1 = new 
SpecificDatumReader<User>(User.class);
                for (File avroOutput : output1) {
+
                        DataFileReader<User> dataFileReader1 = new 
DataFileReader<User>(avroOutput, userDatumReader1);
                        while (dataFileReader1.hasNext()) {
                                User user = dataFileReader1.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/pom.xml 
b/flink-addons/flink-tachyon/pom.xml
index 74640be..4ce31aa 100644
--- a/flink-addons/flink-tachyon/pom.xml
+++ b/flink-addons/flink-tachyon/pom.xml
@@ -49,6 +49,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
                        <groupId>org.tachyonproject</groupId>
                        <artifactId>tachyon</artifactId>
                        <version>0.5.0</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
 
b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 8d29ea7..7318894 100644
--- 
a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ 
b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -18,13 +18,19 @@
 
 package org.apache.flink.tachyon;
 
+
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
@@ -116,4 +122,39 @@ public class HDFSTest {
                        Assert.fail("Error in test: " + e.getMessage() );
                }
        }
+
+       @Test
+       public void testAvroOut() {
+               String type = "one";
+               AvroOutputFormat<String> avroOut =
+                               new AvroOutputFormat<String>( String.class );
+
+               org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+
+               avroOut.setOutputFilePath(new Path(result.toString()));
+               avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+               
avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+
+               try {
+                       avroOut.open(0, 2);
+                       avroOut.writeRecord(type);
+                       avroOut.close();
+
+                       avroOut.open(1, 2);
+                       avroOut.writeRecord(type);
+                       avroOut.close();
+
+
+                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
+                       FileStatus[] files = hdfs.listStatus(result);
+                       Assert.assertEquals(2, files.length);
+                       for(FileStatus file : files) {
+                               
Assert.assertTrue("1.avro".equals(file.getPath().getName()) || 
"2.avro".equals(file.getPath().getName()));
+                       }
+
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index bc7ab73..924de7e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -89,7 +89,7 @@ public abstract class FileOutputFormat<IT> implements 
OutputFormat<IT>, Initiali
         * The key under which the name of the target path is stored in the 
configuration. 
         */
        public static final String FILE_PARAMETER_KEY = "flink.output.file";
-       
+
        /**
         * The path of the file to be written.
         */
@@ -104,7 +104,7 @@ public abstract class FileOutputFormat<IT> implements 
OutputFormat<IT>, Initiali
         * The output directory mode
         */
        private OutputDirectoryMode outputDirectoryMode;
-       
+
        // 
--------------------------------------------------------------------------------------------
        
        /** The stream to which the data is written; */
@@ -161,7 +161,8 @@ public abstract class FileOutputFormat<IT> implements 
OutputFormat<IT>, Initiali
        public OutputDirectoryMode getOutputDirectoryMode() {
                return this.outputDirectoryMode;
        }
-       
+
+
        // ----------------------------------------------------------------
 
        @Override
@@ -233,10 +234,11 @@ public abstract class FileOutputFormat<IT> implements 
OutputFormat<IT>, Initiali
                                }
                        }
                }
-                       
-                       
+
+
+
                // Suffix the path with the parallel instance index, if needed
-               this.actualFilePath = (numTasks > 1 || outputDirectoryMode == 
OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;
+               this.actualFilePath = (numTasks > 1 || outputDirectoryMode == 
OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) 
: p;
 
                // create output file
                this.stream = fs.create(this.actualFilePath, writeMode == 
WriteMode.OVERWRITE);
@@ -245,6 +247,10 @@ public abstract class FileOutputFormat<IT> implements 
OutputFormat<IT>, Initiali
                this.fileCreated = true;
        }
 
+       protected String getDirectoryFileName(int taskNumber) {
+               return Integer.toString(taskNumber + 1);
+       }
+
        @Override
        public void close() throws IOException {
                final FSDataOutputStream s = this.stream;

http://git-wip-us.apache.org/repos/asf/flink/blob/00117f7f/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index 6cdf731..43ded56 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -144,8 +144,38 @@ public class FileOutputFormatTest {
                Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
+
+               // check custom file name inside directory if directory exists
+               (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath));
+               dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+               dfof.testFileName = true;
+               Configuration c = new Configuration();
+               dfof.configure(c);
+
+               exception = false;
+               try {
+                       dfof.open(0, 1);
+                       dfof.close();
+               } catch (Exception e) {
+                       exception = true;
+               }
+               File customOutFile = new 
File(tmpOutPath.getAbsolutePath()+"/fancy-1-0.avro");
+               Assert.assertTrue(!exception);
+               Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
+               Assert.assertTrue(customOutFile.exists() && 
customOutFile.isFile());
+               customOutFile.delete();
+
                // check fail if file in directory exists
+               // create file for test
+               customOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+               try {
+                       customOutFile.createNewFile();
+               } catch (IOException e) {
+                       Assert.fail("Error creating file");
+               }
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -562,11 +592,20 @@ public class FileOutputFormatTest {
        
        public static class DummyFileOutputFormat extends 
FileOutputFormat<IntValue> {
                private static final long serialVersionUID = 1L;
-
+               public boolean testFileName = false;
                @Override
                public void writeRecord(IntValue record) throws IOException {
                        // DO NOTHING
                }
+
+               @Override
+               protected String getDirectoryFileName(int taskNumber) {
+                       if(testFileName) {
+                               return 
"fancy-"+(taskNumber+1)+"-"+taskNumber+".avro";
+                       } else {
+                               return super.getDirectoryFileName(taskNumber);
+                       }
+               }
        }
        
 }

Reply via email to