Created a separate line by line file input reader operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1e979927 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1e979927 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1e979927 Branch: refs/heads/devel-3 Commit: 1e979927a72fdd2707b68f8fe732c9bb93afd50b Parents: 264f629 Author: Pramod Immaneni <[email protected]> Authored: Thu Mar 24 18:24:15 2016 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Mon Apr 4 11:25:58 2016 -0700 ---------------------------------------------------------------------- library/pom.xml | 2 +- .../lib/io/fs/AbstractFileInputOperator.java | 44 +++++++++--- .../malhar/fs/LineByLineFileInputOperator.java | 75 ++++++++++++++++++++ .../io/fs/AbstractFileInputOperatorTest.java | 62 ++++++++-------- 4 files changed, 145 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index 5ff8842..d5511c4 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -186,7 +186,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>15966</maxAllowedViolations> + <maxAllowedViolations>15959</maxAllowedViolations> <consoleOutput>false</consoleOutput> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 0bcf956..ef27cf1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -18,30 +18,52 @@ */ package com.datatorrent.lib.io.fs; -import java.io.*; -import java.util.*; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.validation.constraints.NotNull; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.fs.LineByLineFileInputOperator; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; -import com.datatorrent.api.*; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import com.datatorrent.api.Context.CountersAggregator; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StatsListener; + +import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; /** @@ -1154,11 +1176,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par } /** + * This class is deprecated, use {@link LineByLineFileInputOperator} + * <p> * This is an implementation of the {@link AbstractFileInputOperator} that outputs the lines in a file. * Each line is emitted as a separate tuple. It is emitted as a String. + * </p> * <p> * The directory path where to scan and read files from should be specified using the {@link #directory} property. * </p> + * @deprecated * @displayName File Line Input * @category Input * @tags fs, file, line, lines, input operator http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java b/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java new file mode 100644 index 0000000..ad85d01 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.fs; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +/** + * This is an extension of the {@link AbstractFileInputOperator} that outputs the contents of a file line by line. + * Each line is emitted as a separate tuple in string format. + * <p> + * The directory path where to scan and read files from should be specified using the {@link #directory} property. + * </p> + * @displayName File Line Input + * @category Input + * @tags fs, file, line, lines, input operator + */ +public class LineByLineFileInputOperator extends AbstractFileInputOperator<String> +{ + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + + protected transient BufferedReader br; + + @Override + protected InputStream openFile(Path path) throws IOException + { + InputStream is = super.openFile(path); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + super.closeFile(is); + br.close(); + br = null; + } + + @Override + protected String readEntity() throws IOException + { + return br.readLine(); + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index e9cd0d2..8868b9d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -20,7 +20,13 @@ package com.datatorrent.lib.io.fs; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.junit.Assert; import org.junit.Rule; @@ -28,6 +34,7 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.fs.LineByLineFileInputOperator; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -41,20 +48,19 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.StatsListener; + import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; -import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator; import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultPartition; -import com.datatorrent.api.Partitioner.Partition; -import com.datatorrent.api.StatsListener; - public class AbstractFileInputOperatorTest { public static class TestMeta extends TestWatcher @@ -101,7 +107,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n')); } - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -151,7 +157,7 @@ public class AbstractFileInputOperatorTest @Test public void testPartitioning() throws Exception { - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); @@ -190,12 +196,12 @@ public class AbstractFileInputOperatorTest public void testPartitioningStateTransfer() throws Exception { - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setScanIntervalMillis(0); - FileLineInputOperator initialState = new Kryo().copy(oper); + LineByLineFileInputOperator initialState = new Kryo().copy(oper); // Create 4 files with 3 records each. Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -245,7 +251,7 @@ public class AbstractFileInputOperatorTest /* Collect all operators in a list */ List<AbstractFileInputOperator<String>> opers = Lists.newArrayList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance(); + LineByLineFileInputOperator oi = (LineByLineFileInputOperator)p.getPartitionedInstance(); oi.setup(testMeta.context); oi.output.setSink(sink); opers.add(oi); @@ -293,13 +299,13 @@ public class AbstractFileInputOperatorTest @Test public void testPartitioningStateTransferInterrupted() throws Exception { - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setScanIntervalMillis(0); oper.setEmitBatchSize(2); - FileLineInputOperator initialState = new Kryo().copy(oper); + LineByLineFileInputOperator initialState = new Kryo().copy(oper); // Create 4 files with 3 records each. Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -349,7 +355,7 @@ public class AbstractFileInputOperatorTest /* Collect all operators in a list */ List<AbstractFileInputOperator<String>> opers = Lists.newArrayList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance(); + LineByLineFileInputOperator oi = (LineByLineFileInputOperator)p.getPartitionedInstance(); oi.setup(testMeta.context); oi.output.setSink(sink); opers.add(oi); @@ -378,13 +384,13 @@ public class AbstractFileInputOperatorTest @Test public void testPartitioningStateTransferFailure() throws Exception { - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setScanIntervalMillis(0); oper.setEmitBatchSize(2); - FileLineInputOperator initialState = new Kryo().copy(oper); + LineByLineFileInputOperator initialState = new Kryo().copy(oper); // Create 4 files with 3 records each. Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -434,7 +440,7 @@ public class AbstractFileInputOperatorTest /* Collect all operators in a list */ List<AbstractFileInputOperator<String>> opers = Lists.newArrayList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance(); + LineByLineFileInputOperator oi = (LineByLineFileInputOperator)p.getPartitionedInstance(); oi.setup(testMeta.context); oi.output.setSink(sink); opers.add(oi); @@ -468,7 +474,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(testFile, StringUtils.join(lines, '\n')); - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.scanner = null; oper.failedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(), 1)); @@ -503,7 +509,7 @@ public class AbstractFileInputOperatorTest File testFile = new File(testMeta.dir, "file0"); FileUtils.write(testFile, StringUtils.join(lines, '\n')); - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.scanner = null; oper.unfinishedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(), 2)); @@ -538,7 +544,7 @@ public class AbstractFileInputOperatorTest File testFile = new File(testMeta.dir, "file0"); FileUtils.write(testFile, StringUtils.join(lines, '\n')); - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.scanner = null; oper.pendingFiles.add(testFile.getAbsolutePath()); @@ -573,7 +579,7 @@ public class AbstractFileInputOperatorTest File testFile = new File(testMeta.dir, "file0"); FileUtils.write(testFile, StringUtils.join(lines, '\n')); - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.scanner = null; oper.currentFile = testFile.getAbsolutePath(); oper.offset = 1; @@ -611,7 +617,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); @@ -660,7 +666,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); @@ -703,7 +709,7 @@ public class AbstractFileInputOperatorTest } FileUtils.write(new File(testMeta.dir, "file0"), StringUtils.join(lines, '\n')); - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); oper.setEmitBatchSize(5); @@ -765,7 +771,7 @@ public class AbstractFileInputOperatorTest FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); @@ -815,7 +821,7 @@ public class AbstractFileInputOperatorTest @Test public void testIdempotentStorageManagerPartitioning() throws Exception { - FileLineInputOperator oper = new FileLineInputOperator(); + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); oper.setIdempotentStorageManager(new TestStorageManager());
