Repository: incubator-apex-malhar Updated Branches: refs/heads/master 422f5d946 -> 682c7ba59
1. Marking BytesFileOutputOperator as @evolving 2. Added japicmp exclusion for this class 3. Added Converter field for pluging-in custom conversion to byte[] 4. Provided default converters for no-op, String 5. Removed input port stringInput 6. Added javadocs 7. Renamed BytesFileOutputOperator-> GenericFileOutputOperator 8. Introduced BytesFileOutputOperator, String FileOutputOperator Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/682c7ba5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/682c7ba5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/682c7ba5 Branch: refs/heads/master Commit: 682c7ba59105671415e7e115333ae41a4f2f0942 Parents: 422f5d9 Author: yogidevendra <[email protected]> Authored: Thu Jun 2 16:24:12 2016 +0530 Committer: yogidevendra <[email protected]> Committed: Fri Jun 3 15:48:51 2016 +0530 ---------------------------------------------------------------------- .../malhar/lib/fs/BytesFileOutputOperator.java | 295 ---------------- .../lib/fs/GenericFileOutputOperator.java | 340 +++++++++++++++++++ .../lib/fs/BytesFileOutputOperatorTest.java | 151 -------- .../lib/fs/GenericFileOutputOperatorTest.java | 153 +++++++++ pom.xml | 1 + 5 files changed, 494 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java deleted file mode 100644 index 9567872..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.apex.malhar.lib.fs; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.concurrent.ExecutionException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.AutoMetric; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator; -import com.datatorrent.netlet.util.DTThrowable; - -/** - * This class is responsible for writing tuples to HDFS. All tuples are written - * to the same file. Rolling file based on size, no. of tuples, idle windows, - * elapsed windows is supported. - * - * @since 3.4.0 - */ - -public class BytesFileOutputOperator extends AbstractSingleFileOutputOperator<byte[]> -{ - - /** - * Flag to mark if new data in current application window - */ - private transient boolean isNewDataInCurrentWindow; - - /** - * Separator between the tuples - */ - private String tupleSeparator; - - /** - * byte[] representation of tupleSeparator - */ - private transient byte[] tupleSeparatorBytes; - - /** - * No. of bytes received in current application window - */ - @AutoMetric - private long byteCount; - - /** - * No. of tuples present in current part for file - */ - private long currentPartTupleCount; - - /** - * Max. number of tuples allowed per part. Part file will be finalized after - * these many tuples - */ - private long maxTupleCount = Long.MAX_VALUE; - - /** - * No. of windows since last new data received - */ - private long currentPartIdleWindows; - - /** - * Max number of idle windows for which no new data is added to current part - * file. Part file will be finalized after these many idle windows after last - * new data. - */ - private long maxIdleWindows = Long.MAX_VALUE; - - /** - * Stream codec for string input port - */ - protected StreamCodec<String> stringStreamCodec; - - /** - * Default value for stream expiry - */ - private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour - - /** - * Default value for rotation windows - */ - private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min - - /** - * Initializing default values for tuple separator, stream expiry, rotation - * windows - */ - public BytesFileOutputOperator() - { - setTupleSeparator(System.getProperty("line.separator")); - setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL); - setRotationWindows(DEFAULT_ROTATION_WINDOWS); - } - - /** - * Input port for receiving string tuples. - */ - public final transient DefaultInputPort<String> stringInput = new DefaultInputPort<String>() - { - @Override - public void process(String tuple) - { - processTuple(tuple.getBytes()); - } - - @Override - public StreamCodec<String> getStreamCodec() - { - if (BytesFileOutputOperator.this.stringStreamCodec == null) { - return super.getStreamCodec(); - } else { - return stringStreamCodec; - } - } - }; - - /** - * {@inheritDoc} - * - * @return byte[] representation of the given tuple. if input tuple is of type - * byte[] then it is returned as it is. for any other type toString() - * representation is used to generate byte[]. - */ - @Override - protected byte[] getBytesForTuple(byte[] tuple) - { - ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream(); - - try { - bytesOutStream.write(tuple); - bytesOutStream.write(tupleSeparatorBytes); - byteCount += bytesOutStream.size(); - return bytesOutStream.toByteArray(); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - try { - bytesOutStream.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - /** - * Initializing per window level fields {@inheritDoc} - */ - @Override - public void beginWindow(long windowId) - { - super.beginWindow(windowId); - byteCount = 0; - isNewDataInCurrentWindow = false; - } - - /** - * {@inheritDoc} Does additional state maintenance for rollover - */ - @Override - protected void processTuple(byte[] tuple) - { - super.processTuple(tuple); - isNewDataInCurrentWindow = true; - - if (++currentPartTupleCount == maxTupleCount) { - rotateCall(getPartitionedFileName()); - } - } - - /** - * {@inheritDoc} Does additional checks if file should be rolled over for this - * window. - */ - @Override - public void endWindow() - { - super.endWindow(); - - if (!isNewDataInCurrentWindow) { - ++currentPartIdleWindows; - } else { - currentPartIdleWindows = 0; - } - - if (checkEndWindowFinalization()) { - rotateCall(getPartitionedFileName()); - } - } - - /** - * Rollover check at the endWindow - */ - private boolean checkEndWindowFinalization() - { - if ((currentPartIdleWindows == maxIdleWindows)) { - return true; - } - return false; - } - - /** - * {@inheritDoc} Handles file rotation along with exception handling - * - * @param lastFile - */ - protected void rotateCall(String lastFile) - { - try { - this.rotate(lastFile); - currentPartIdleWindows = 0; - currentPartTupleCount = 0; - } catch (IOException ex) { - LOG.error("Exception in file rotation", ex); - DTThrowable.rethrow(ex); - } catch (ExecutionException ex) { - LOG.error("Exception in file rotation", ex); - DTThrowable.rethrow(ex); - } - } - - - /** - * @return Separator between the tuples - */ - public String getTupleSeparator() - { - return tupleSeparator; - } - - /** - * @param separator - * Separator between the tuples - */ - public void setTupleSeparator(String separator) - { - this.tupleSeparator = separator; - this.tupleSeparatorBytes = separator.getBytes(); - } - - /** - * @return max tuples in a part file - */ - public long getMaxTupleCount() - { - return maxTupleCount; - } - - /** - * @param maxTupleCount - * max tuples in a part file - */ - public void setMaxTupleCount(long maxTupleCount) - { - this.maxTupleCount = maxTupleCount; - } - - /** - * @return max number of idle windows for rollover - */ - public long getMaxIdleWindows() - { - return maxIdleWindows; - } - - /** - * @param maxIdleWindows max number of idle windows for rollover - */ - public void setMaxIdleWindows(long maxIdleWindows) - { - this.maxIdleWindows = maxIdleWindows; - } - - private static final Logger LOG = LoggerFactory.getLogger(BytesFileOutputOperator.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java new file mode 100644 index 0000000..017a890 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.converter.Converter; +import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * This class is responsible for writing tuples to HDFS. All tuples are written + * to the same file. Rolling file based on size, no. of tuples, idle windows, + * elapsed windows is supported. + * + * @since 3.4.0 + */ [email protected] +public abstract class GenericFileOutputOperator<INPUT> extends AbstractSingleFileOutputOperator<INPUT> +{ + + /** + * Flag to mark if new data in current application window + */ + private transient boolean isNewDataInCurrentWindow; + + /** + * Separator between the tuples + */ + private String tupleSeparator; + + /** + * byte[] representation of tupleSeparator + */ + private transient byte[] tupleSeparatorBytes; + + /** + * No. of bytes received in current application window + */ + @AutoMetric + private long byteCount; + + /** + * No. of tuples present in current part for file + */ + private long currentPartTupleCount; + + /** + * Max. number of tuples allowed per part. Part file will be finalized after + * these many tuples + */ + private long maxTupleCount = Long.MAX_VALUE; + + /** + * No. of windows since last new data received + */ + private long currentPartIdleWindows; + + /** + * Converter for conversion of input tuples to byte[] + */ + @NotNull + private Converter<INPUT, byte[]> converter; + + /** + * Max number of idle windows for which no new data is added to current part + * file. Part file will be finalized after these many idle windows after last + * new data. + */ + private long maxIdleWindows = Long.MAX_VALUE; + + /** + * Stream codec for string input port + */ + protected StreamCodec<String> stringStreamCodec; + + /** + * Default value for stream expiry + */ + private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour + + /** + * Default value for rotation windows + */ + private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min + + /** + * Initializing default values for tuple separator, stream expiry, rotation + * windows + */ + public GenericFileOutputOperator() + { + setTupleSeparator(System.getProperty("line.separator")); + setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL); + setRotationWindows(DEFAULT_ROTATION_WINDOWS); + } + + /** + * {@inheritDoc} + * + * @return byte[] representation of the given tuple. if input tuple is of type + * byte[] then it is returned as it is. for any other type toString() + * representation is used to generate byte[]. + */ + @Override + protected byte[] getBytesForTuple(INPUT tuple) + { + ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream(); + + try { + bytesOutStream.write(converter.convert(tuple)); + bytesOutStream.write(tupleSeparatorBytes); + byteCount += bytesOutStream.size(); + return bytesOutStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + bytesOutStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Initializing per window level fields {@inheritDoc} + */ + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + byteCount = 0; + isNewDataInCurrentWindow = false; + } + + /** + * {@inheritDoc} Does additional state maintenance for rollover + */ + @Override + protected void processTuple(INPUT tuple) + { + super.processTuple(tuple); + isNewDataInCurrentWindow = true; + + if (++currentPartTupleCount == maxTupleCount) { + rotateCall(getPartitionedFileName()); + } + } + + /** + * {@inheritDoc} Does additional checks if file should be rolled over for this + * window. + */ + @Override + public void endWindow() + { + super.endWindow(); + + if (!isNewDataInCurrentWindow) { + ++currentPartIdleWindows; + } else { + currentPartIdleWindows = 0; + } + + if (checkEndWindowFinalization()) { + rotateCall(getPartitionedFileName()); + } + } + + /** + * Rollover check at the endWindow + */ + private boolean checkEndWindowFinalization() + { + if ((currentPartIdleWindows == maxIdleWindows)) { + return true; + } + return false; + } + + /** + * {@inheritDoc} Handles file rotation along with exception handling + * + * @param lastFile + */ + protected void rotateCall(String lastFile) + { + try { + this.rotate(lastFile); + currentPartIdleWindows = 0; + currentPartTupleCount = 0; + } catch (IOException ex) { + LOG.error("Exception in file rotation", ex); + DTThrowable.rethrow(ex); + } catch (ExecutionException ex) { + LOG.error("Exception in file rotation", ex); + DTThrowable.rethrow(ex); + } + } + + /** + * @return Separator between the tuples + */ + public String getTupleSeparator() + { + return tupleSeparator; + } + + /** + * @param separator + * Separator between the tuples + */ + public void setTupleSeparator(String separator) + { + this.tupleSeparator = separator; + this.tupleSeparatorBytes = separator.getBytes(); + } + + /** + * @return max tuples in a part file + */ + public long getMaxTupleCount() + { + return maxTupleCount; + } + + /** + * @param maxTupleCount + * max tuples in a part file + */ + public void setMaxTupleCount(long maxTupleCount) + { + this.maxTupleCount = maxTupleCount; + } + + /** + * @return max number of idle windows for rollover + */ + public long getMaxIdleWindows() + { + return maxIdleWindows; + } + + /** + * @param maxIdleWindows + * max number of idle windows for rollover + */ + public void setMaxIdleWindows(long maxIdleWindows) + { + this.maxIdleWindows = maxIdleWindows; + } + + /** + * Converter for conversion of input tuples to byte[] + * @return converter + */ + public Converter<INPUT, byte[]> getConverter() + { + return converter; + } + + /** + * Converter for conversion of input tuples to byte[] + * @param converter + */ + public void setConverter(Converter<INPUT, byte[]> converter) + { + this.converter = converter; + } + + /** + * Converter returning input tuples as byte[] without any conversion + */ + public static class NoOpConverter implements Converter<byte[], byte[]> + { + @Override + public byte[] convert(byte[] tuple) + { + return tuple; + } + } + + public static class BytesFileOutputOperator extends GenericFileOutputOperator<byte[]> + { + + public BytesFileOutputOperator() + { + setConverter(new NoOpConverter()); + } + } + + /** + * Converter returning byte[] conversion of the input String. + */ + public static class StringToBytesConverter implements Converter<String, byte[]> + { + @Override + public byte[] convert(String tuple) + { + return ((String)tuple).getBytes(); + } + } + + public static class StringFileOutputOperator extends GenericFileOutputOperator<String> + { + public StringFileOutputOperator() + { + setConverter(new StringToBytesConverter()); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GenericFileOutputOperator.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java deleted file mode 100644 index 1ea7352..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.apex.malhar.lib.fs; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.commons.io.FileUtils; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest; -import com.datatorrent.netlet.util.DTThrowable; - -public class BytesFileOutputOperatorTest extends AbstractFileOutputOperatorTest -{ - - /** - * Test file rollover in case of idle windows - * - * @throws IOException - */ - @Test - public void testIdleWindowsFinalize() throws IOException - { - BytesFileOutputOperator writer = new BytesFileOutputOperator(); - writer.setOutputFileName("output.txt"); - writer.setFilePath(testMeta.getDir()); - writer.setAlwaysWriteToTmp(true); - writer.setMaxIdleWindows(5); - writer.setup(testMeta.testOperatorContext); - - String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a", "7b" }, {}, {}, {}, - {}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b" }, {}, {}, {}, {}, {}, - {}, {"26a", "26b"} }; - - for (int i = 0; i <= 12; i++) { - writer.beginWindow(i); - for (String t : tuples[i]) { - writer.stringInput.put(t); - } - writer.endWindow(); - } - writer.committed(10); - - for (int i = 13; i <= 26; i++) { - writer.beginWindow(i); - for (String t : tuples[i]) { - writer.stringInput.put(t); - } - writer.endWindow(); - } - writer.committed(20); - writer.committed(26); - - String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n", - "26a\n26b\n" }; - - for (int i = 0; i < expected.length; i++) { - checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true); - } - } - - /** - * Test file rollover for tuple count - * - * @throws IOException - */ - @Test - public void testTupleCountFinalize() throws IOException - { - BytesFileOutputOperator writer = new BytesFileOutputOperator(); - writer.setOutputFileName("output.txt"); - writer.setFilePath(testMeta.getDir()); - writer.setAlwaysWriteToTmp(true); - writer.setMaxTupleCount(10); - writer.setup(testMeta.testOperatorContext); - - String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" }, {}, {"6a", "6b" }, - {"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a", "14b" }, {}, {}, - {}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }}; - - for (int i = 0; i < tuples.length; i++) { - writer.beginWindow(i); - for (String t : tuples[i]) { - writer.stringInput.put(t); - } - writer.endWindow(); - if (i % 10 == 0) { - writer.committed(10); - } - } - writer.committed(tuples.length); - - String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n", - "18a\n18b\n19a\n19b\n20a\n21a\n22a\n" }; - - for (int i = 0; i < expected.length; i++) { - checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true); - } - } - - public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput, boolean checkTmp) - { - if (fileCount >= 0) { - baseFilePath += "." + fileCount; - } - - File file = new File(baseFilePath); - - if (!file.exists()) { - String[] extensions = {"tmp"}; - Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions, false); - for (File tmpFile : tmpFiles) { - if (file.getPath().startsWith(baseFilePath)) { - file = tmpFile; - break; - } - } - } - - String fileContents = null; - - try { - fileContents = FileUtils.readFileToString(file); - } catch (IOException ex) { - DTThrowable.rethrow(ex); - } - - Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput, fileContents); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java new file mode 100644 index 0000000..52d5c5a --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.BytesFileOutputOperator; +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; +import org.apache.commons.io.FileUtils; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest; +import com.datatorrent.netlet.util.DTThrowable; + +public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTest +{ + + /** + * Test file rollover in case of idle windows + * + * @throws IOException + */ + @Test + public void testIdleWindowsFinalize() throws IOException + { + StringFileOutputOperator writer = new StringFileOutputOperator(); + writer.setOutputFileName("output.txt"); + writer.setFilePath(testMeta.getDir()); + writer.setAlwaysWriteToTmp(true); + writer.setMaxIdleWindows(5); + writer.setup(testMeta.testOperatorContext); + + String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a", "7b" }, {}, {}, {}, + {}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b" }, {}, {}, {}, {}, {}, + {}, {"26a", "26b"} }; + + for (int i = 0; i <= 12; i++) { + writer.beginWindow(i); + for (String t : tuples[i]) { + writer.input.put(t); + } + writer.endWindow(); + } + writer.committed(10); + + for (int i = 13; i <= 26; i++) { + writer.beginWindow(i); + for (String t : tuples[i]) { + writer.input.put(t); + } + writer.endWindow(); + } + writer.committed(20); + writer.committed(26); + + String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n", + "26a\n26b\n" }; + + for (int i = 0; i < expected.length; i++) { + checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true); + } + } + + /** + * Test file rollover for tuple count + * + * @throws IOException + */ + @Test + public void testTupleCountFinalize() throws IOException + { + BytesFileOutputOperator writer = new BytesFileOutputOperator(); + writer.setOutputFileName("output.txt"); + writer.setFilePath(testMeta.getDir()); + writer.setAlwaysWriteToTmp(true); + writer.setMaxTupleCount(10); + writer.setup(testMeta.testOperatorContext); + + String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" }, {}, {"6a", "6b" }, + {"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a", "14b" }, {}, {}, + {}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }}; + + for (int i = 0; i < tuples.length; i++) { + writer.beginWindow(i); + for (String t : tuples[i]) { + writer.input.put(t.getBytes()); + } + writer.endWindow(); + if (i % 10 == 0) { + writer.committed(10); + } + } + writer.committed(tuples.length); + + String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n", + "18a\n18b\n19a\n19b\n20a\n21a\n22a\n" }; + + for (int i = 0; i < expected.length; i++) { + checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true); + } + } + + public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput, boolean checkTmp) + { + if (fileCount >= 0) { + baseFilePath += "." + fileCount; + } + + File file = new File(baseFilePath); + + if (!file.exists()) { + String[] extensions = {"tmp"}; + Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions, false); + for (File tmpFile : tmpFiles) { + if (file.getPath().startsWith(baseFilePath)) { + file = tmpFile; + break; + } + } + } + + String fileContents = null; + + try { + fileContents = FileUtils.readFileToString(file); + } catch (IOException ex) { + DTThrowable.rethrow(ex); + } + + Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput, fileContents); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 707db06..3f1b6ee 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,7 @@ <excludes> <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude> <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude> + <exclude>org.apache.apex.malhar.lib.fs.BytesFileOutputOperator</exclude> </excludes> </parameter> <skip>${semver.plugin.skip}</skip>
