Repository: apex-malhar Updated Branches: refs/heads/master 3b8135061 -> a4551b42f
APEXMALHAR-2476-Fix-tupleSeperator-override. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a4551b42 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a4551b42 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a4551b42 Branch: refs/heads/master Commit: a4551b42fbcdffa615937c314a918af11e581674 Parents: 3b81350 Author: yogidevendra <[email protected]> Authored: Thu Apr 6 15:40:07 2017 -0700 Committer: yogidevendra <[email protected]> Committed: Fri Apr 21 10:49:00 2017 +0530 ---------------------------------------------------------------------- .../lib/fs/GenericFileOutputOperator.java | 8 ++++ .../lib/fs/GenericFileOutputOperatorTest.java | 48 ++++++++++++++++++++ 2 files changed, 56 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a4551b42/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java index 2ff405d..111ea7b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.StreamCodec; import com.datatorrent.lib.converter.Converter; import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator; @@ -122,6 +123,13 @@ public class GenericFileOutputOperator<INPUT> extends AbstractSingleFileOutputOp setRotationWindows(DEFAULT_ROTATION_WINDOWS); } + @Override + public void setup(OperatorContext context) + { + super.setup(context); + this.tupleSeparatorBytes = tupleSeparator.getBytes(); + } + /** * {@inheritDoc} * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a4551b42/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java index f0d2915..f248266 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java @@ -29,7 +29,11 @@ import org.junit.Test; import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.BytesFileOutputOperator; import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest; import com.datatorrent.netlet.util.DTThrowable; @@ -127,6 +131,50 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes } } + public static class TestApplication implements StreamingApplication + { + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + LineByLineFileInputOperator input = dag.addOperator("input", new LineByLineFileInputOperator()); + StringFileOutputOperator output = dag.addOperator("output", new StringFileOutputOperator()); + dag.addStream("data", input.output, output.input); + } + } + + @Test + public void runTestApplication() throws Exception + { + FileUtils.write(new File(testMeta.getDir(), "input.txt"), "a\nb\nc\nd\n"); + + Configuration conf = new Configuration(false); + conf.set("dt.operator.input.prop.directory", testMeta.getDir() + "/input.txt"); + conf.set("dt.operator.output.prop.filePath", testMeta.getDir()); + conf.set("dt.operator.output.prop.outputFileName", "output.txt"); + conf.set("dt.operator.output.prop.tupleSeparator", "-"); + conf.set("dt.operator.output.prop.maxIdleWindows", "2"); + conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT", "2"); + + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new TestApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + File outputFile = new File(testMeta.getDir(), "output.txt_2.0"); + final int MAX = 60; + for (int i = 0; i < MAX && (!outputFile.exists()); ++i) { + Thread.sleep(1000); + } + if (!outputFile.exists()) { + String msg = String.format("Error: output file not found after %d seconds%n", MAX); + throw new RuntimeException(msg); + } + + String output = FileUtils.readFileToString(outputFile); + Assert.assertEquals("a-b-c-d-", output); + } + @Test public void testRotationWithNoData() throws InterruptedException {
