Github user pasalkarsachin1 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1928#discussion_r100880730
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ZippedTextFileReader.java
 ---
    @@ -0,0 +1,241 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hdfs.spout;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.util.LineReader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipInputStream;
    +
    +// Todo: Track file offsets instead of line number
    +public class ZippedTextFileReader extends AbstractFileReader {
    +  public static final String[] defaultFields = {"line"};
    +  public static final String CHARSET = "hdfsspout.reader.charset";
    +  public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
    +
    +  private static final int DEFAULT_BUFF_SIZE = 4096;
    +  private long start;
    +  private long end;
    +  private LineReader in = null;
    +  private String zipEntryName; // Entry filename in current zipfile
    +  private String zipFilename;
    +  private BufferedReader reader;
    +  private ZipInputStream zip = null;
    +  private final Logger LOG = 
LoggerFactory.getLogger(ZippedTextFileReader.class);
    +  private ZippedTextFileReader.Offset offset;
    +  private Text value = new Text();  
    +  
    +  public ZippedTextFileReader(FileSystem fs, Path file, Map conf) throws 
IOException {
    +    this(fs, file, conf, new ZippedTextFileReader.Offset(0,0) );
    +  }
    +
    +  public ZippedTextFileReader(FileSystem fs, Path file, Map conf, String 
startOffset) throws IOException {
    +    this(fs, file, conf, new ZippedTextFileReader.Offset(startOffset) );
    +  }
    +
    +  private ZippedTextFileReader(FileSystem fs, Path file, Map conf, 
ZippedTextFileReader.Offset startOffset)
    +          throws IOException {
    +    super(fs, file);
    +    offset = startOffset;
    +    FSDataInputStream in = fs.open(file);
    +
    +    String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" 
: conf.get(CHARSET).toString();
    +    int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? 
DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
    +    reader = new BufferedReader(new InputStreamReader(in, charSet), 
buffSz);
    +    if(offset.charOffset >0) {
    +      reader.skip(offset.charOffset);
    +    }
    +
    +  }
    +  public void initialize(InputSplit inputSplit, TaskAttemptContext 
context) throws IOException, InterruptedException {
    +
    +           FileSplit split = (FileSplit) inputSplit;
    +           Configuration conf = context.getConfiguration();
    +
    +           // Initialize start and end markers for the current split.
    +           start = split.getStart();
    +           end = start + split.getLength();
    +
    +           final Path path = split.getPath(); // Filename of the current 
split.
    +           // key.set(path.getName()); // Set the current filename as key
    +           zipFilename = path.getName();
    +
    +           LOG.info("zipFilename: ============:" + zipFilename);
    +           System.out.println("zipFilename: ============:" + zipFilename);
    --- End diff --
    
    Can you remove syso


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to