[ https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057354#comment-15057354 ]
ASF GitHub Bot commented on STORM-1199: --------------------------------------- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/936#discussion_r47597484 --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import backtype.storm.tuple.Fields; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +// Todo: Track file offsets instead of line number +class TextFileReader extends AbstractFileReader { + public static final String CHARSET = "hdfsspout.reader.charset"; + public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; + + public static final String DEFAULT_FIELD_NAME = "line"; + + private static final int DEFAULT_BUFF_SIZE = 4096; + + private BufferedReader reader; + private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); + private TextFileReader.Offset offset; + + public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException { + super(fs, file, new Fields(DEFAULT_FIELD_NAME)); + FSDataInputStream in = fs.open(file); + String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); + int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); + reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); + offset = new TextFileReader.Offset(0,0); + } + + public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException { + super(fs, file, new Fields(DEFAULT_FIELD_NAME)); + offset = new TextFileReader.Offset(startOffset); + FSDataInputStream in = fs.open(file); + in.seek(offset.byteOffset); + String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); + int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); + reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); + } --- End diff -- Two constructors share similar logic. You may want to have code like below. public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException { this(fs, file, conf, null); } public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException { super(fs, file, new Fields(DEFAULT_FIELD_NAME)); offset = startOffset != null? new TextFileReader.Offset(startOffset) : new TextFileReader.Offset(0,0); FSDataInputStream in = fs.open(file); in.seek(offset.byteOffset); String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); } > Create HDFS Spout > ----------------- > > Key: STORM-1199 > URL: https://issues.apache.org/jira/browse/STORM-1199 > Project: Apache Storm > Issue Type: New Feature > Reporter: Roshan Naik > Assignee: Roshan Naik > Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf, > hdfs-spout.1.patch > > > Create an HDFS spout so that Storm can suck in data from files in a HDFS > directory -- This message was sent by Atlassian JIRA (v6.3.4#6332)