Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 3d0166bfe -> fdc91fa3f
MLHR-1858 #comment File input operator that emits the lines of the file Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c85a4180 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c85a4180 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c85a4180 Branch: refs/heads/devel-3 Commit: c85a4180dbdfc01f224e6fa3ffb37f62aeb1a1c5 Parents: f9c7992 Author: Pramod Immaneni <[email protected]> Authored: Sat Sep 26 18:22:41 2015 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Sat Sep 26 18:23:55 2015 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 46 ++++++++ .../io/fs/AbstractFileInputOperatorTest.java | 116 ++++++++----------- 2 files changed, 93 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c85a4180/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 63e5594..18fe83b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -1158,4 +1158,50 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par return result; } } + + /** + * This is an implementation of the {@link AbstractFileInputOperator} that outputs the lines in a file. + * Each line is emitted as a separate tuple. It is emitted as a String. + * <p> + * The directory path where to scan and read files from should be specified using the {@link #directory} property. + * </p> + * @displayName File Line Input + * @category Input + * @tags fs, file, line, lines, input operator + * + */ + public static class FileLineInputOperator extends AbstractFileInputOperator<String> + { + public transient final DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + + protected transient BufferedReader br; + + @Override + protected InputStream openFile(Path path) throws IOException + { + InputStream is = super.openFile(path); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + super.closeFile(is); + br.close(); + br = null; + } + + @Override + protected String readEntity() throws IOException + { + return br.readLine(); + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c85a4180/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index 2333344..d50ec17 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -15,30 +15,42 @@ */ package com.datatorrent.lib.io.fs; -import com.datatorrent.api.*; -import com.datatorrent.api.Partitioner.Partition; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; -import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; -import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; +import java.io.File; +import java.io.IOException; +import java.util.*; import com.esotericsoftware.kryo.Kryo; -import com.google.common.collect.*; - -import java.io.*; -import java.util.*; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; -import org.junit.*; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator; +import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.StatsListener; + public class AbstractFileInputOperatorTest { public static class TestMeta extends TestWatcher @@ -70,41 +82,7 @@ public class AbstractFileInputOperatorTest } @Rule public TestMeta testMeta = new TestMeta(); - - public static class TestFileInputOperator extends AbstractFileInputOperator<String> - { - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); - private transient BufferedReader br = null; - - @Override - protected InputStream openFile(Path path) throws IOException - { - InputStream is = super.openFile(path); - br = new BufferedReader(new InputStreamReader(is)); - return is; - } - - @Override - protected void closeFile(InputStream is) throws IOException - { - super.closeFile(is); - br.close(); - br = null; - } - - @Override - protected String readEntity() throws IOException - { - return br.readLine(); - } - - @Override - protected void emit(String tuple) - { - output.emit(tuple); - } - } - + @Test public void testSinglePartiton() throws Exception { @@ -119,7 +97,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n')); } - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -169,7 +147,7 @@ public class AbstractFileInputOperatorTest @Test public void testPartitioning() throws Exception { - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); @@ -208,12 +186,12 @@ public class AbstractFileInputOperatorTest public void testPartitioningStateTransfer() throws Exception { - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setScanIntervalMillis(0); - TestFileInputOperator initialState = new Kryo().copy(oper); + FileLineInputOperator initialState = new Kryo().copy(oper); // Create 4 files with 3 records each. Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -263,7 +241,7 @@ public class AbstractFileInputOperatorTest /* Collect all operators in a list */ List<AbstractFileInputOperator<String>> opers = Lists.newArrayList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - TestFileInputOperator oi = (TestFileInputOperator)p.getPartitionedInstance(); + FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance(); oi.setup(testMeta.context); oi.output.setSink(sink); opers.add(oi); @@ -311,13 +289,13 @@ public class AbstractFileInputOperatorTest @Test public void testPartitioningStateTransferInterrupted() throws Exception { - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setScanIntervalMillis(0); oper.setEmitBatchSize(2); - TestFileInputOperator initialState = new Kryo().copy(oper); + FileLineInputOperator initialState = new Kryo().copy(oper); // Create 4 files with 3 records each. Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -367,7 +345,7 @@ public class AbstractFileInputOperatorTest /* Collect all operators in a list */ List<AbstractFileInputOperator<String>> opers = Lists.newArrayList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - TestFileInputOperator oi = (TestFileInputOperator)p.getPartitionedInstance(); + FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance(); oi.setup(testMeta.context); oi.output.setSink(sink); opers.add(oi); @@ -396,13 +374,13 @@ public class AbstractFileInputOperatorTest @Test public void testPartitioningStateTransferFailure() throws Exception { - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setScanIntervalMillis(0); oper.setEmitBatchSize(2); - TestFileInputOperator initialState = new Kryo().copy(oper); + FileLineInputOperator initialState = new Kryo().copy(oper); // Create 4 files with 3 records each. Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -452,7 +430,7 @@ public class AbstractFileInputOperatorTest /* Collect all operators in a list */ List<AbstractFileInputOperator<String>> opers = Lists.newArrayList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - TestFileInputOperator oi = (TestFileInputOperator)p.getPartitionedInstance(); + FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance(); oi.setup(testMeta.context); oi.output.setSink(sink); opers.add(oi); @@ -486,7 +464,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(testFile, StringUtils.join(lines, '\n')); - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.scanner = null; oper.failedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(), 1)); @@ -521,7 +499,7 @@ public class AbstractFileInputOperatorTest File testFile = new File(testMeta.dir, "file0"); FileUtils.write(testFile, StringUtils.join(lines, '\n')); - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.scanner = null; oper.unfinishedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(), 2)); @@ -556,7 +534,7 @@ public class AbstractFileInputOperatorTest File testFile = new File(testMeta.dir, "file0"); FileUtils.write(testFile, StringUtils.join(lines, '\n')); - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.scanner = null; oper.pendingFiles.add(testFile.getAbsolutePath()); @@ -591,7 +569,7 @@ public class AbstractFileInputOperatorTest File testFile = new File(testMeta.dir, "file0"); FileUtils.write(testFile, StringUtils.join(lines, '\n')); - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.scanner = null; oper.currentFile = testFile.getAbsolutePath(); oper.offset = 1; @@ -629,7 +607,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); @@ -678,7 +656,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); @@ -721,7 +699,7 @@ public class AbstractFileInputOperatorTest } FileUtils.write(new File(testMeta.dir, "file0"), StringUtils.join(lines, '\n')); - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); oper.setEmitBatchSize(5); @@ -783,7 +761,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); @@ -833,7 +811,7 @@ public class AbstractFileInputOperatorTest @Test public void testIdempotentStorageManagerPartitioning() throws Exception { - TestFileInputOperator oper = new TestFileInputOperator(); + FileLineInputOperator oper = new FileLineInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setIdempotentStorageManager(new TestStorageManager());
