merged pramod/APEXMALHAR-2025
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b4fd6a60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b4fd6a60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b4fd6a60 Branch: refs/heads/devel-3 Commit: b4fd6a60d8bc036e91ed1803f3e809ea86941d22 Parents: 723189b 1e97992 Author: Chandni Singh <[email protected]> Authored: Mon Apr 4 13:35:33 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Mon Apr 4 13:35:33 2016 -0700 ---------------------------------------------------------------------- library/pom.xml | 2 +- .../lib/io/fs/AbstractFileInputOperator.java | 44 +++++++++--- .../malhar/fs/LineByLineFileInputOperator.java | 75 ++++++++++++++++++++ .../io/fs/AbstractFileInputOperatorTest.java | 62 ++++++++-------- 4 files changed, 145 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b4fd6a60/library/pom.xml ---------------------------------------------------------------------- diff --cc library/pom.xml index 0fcc3cb,d5511c4..ffee7f9 --- a/library/pom.xml +++ b/library/pom.xml @@@ -186,7 -186,7 +186,7 @@@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>16002</maxAllowedViolations> - <maxAllowedViolations>15959</maxAllowedViolations> ++ <maxAllowedViolations>15995</maxAllowedViolations> <consoleOutput>false</consoleOutput> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b4fd6a60/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java ---------------------------------------------------------------------- diff --cc library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java index 0000000,ad85d01..812709e mode 000000,100644..100644 --- a/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java @@@ -1,0 -1,75 +1,75 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.apex.malhar.fs; + + import java.io.BufferedReader; + import java.io.IOException; + import java.io.InputStream; + import java.io.InputStreamReader; + + import org.apache.hadoop.fs.Path; + + import com.datatorrent.api.DefaultOutputPort; + + import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + + /** + * This is an extension of the {@link AbstractFileInputOperator} that outputs the contents of a file line by line. + * Each line is emitted as a separate tuple in string format. + * <p> + * The directory path where to scan and read files from should be specified using the {@link #directory} property. + * </p> - * @displayName File Line Input ++ * @displayName Line-by-line File Input + * @category Input + * @tags fs, file, line, lines, input operator + */ + public class LineByLineFileInputOperator extends AbstractFileInputOperator<String> + { + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + + protected transient BufferedReader br; + + @Override + protected InputStream openFile(Path path) throws IOException + { + InputStream is = super.openFile(path); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + super.closeFile(is); + br.close(); + br = null; + } + + @Override + protected String readEntity() throws IOException + { + return br.readLine(); + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b4fd6a60/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ----------------------------------------------------------------------
