http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 6d626be..af0127a 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -1,12 +1,7 @@
-
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
@@ -28,77 +23,60 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.storm.Config;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.storm.Config;
 import org.apache.storm.hdfs.common.HdfsUtils;
 import org.apache.storm.hdfs.security.HdfsSecurityUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HdfsSpout extends BaseRichSpout {
 
+    // other members
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
+    private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
+    HashMap<MessageId, List<Object>> inflight = new HashMap<>();
+    LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = 
new LinkedBlockingQueue<>();
+    HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null;
     // user configurable
     private String hdfsUri;            // required
     private String readerType;         // required
     private Fields outputFields;       // required
-
     private String sourceDir;        // required
     private Path sourceDirPath;        // required
-
     private String archiveDir;       // required
     private Path archiveDirPath;       // required
-
     private String badFilesDir;      // required
     private Path badFilesDirPath;      // required
-
     private String lockDir;
     private Path lockDirPath;
-
     private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
     private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC;
     private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
     private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
     private boolean clocksInSync = true;
-
     private String inprogress_suffix = ".inprogress"; // not configurable to 
prevent change between topology restarts
     private String ignoreSuffix = ".ignore";
-
     private String outputStreamName = null;
-
-    // other members
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
-
     private ProgressTracker tracker = null;
-
     private FileSystem hdfs;
     private FileReader reader;
-
     private SpoutOutputCollector collector;
-    HashMap<MessageId, List<Object>> inflight = new HashMap<>();
-    LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = 
new LinkedBlockingQueue<>();
-
     private Configuration hdfsConfig;
-
     private Map<String, Object> conf = null;
     private FileLock lock;
     private String spoutId = null;
-
-    HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null;
     private long lastExpiredLockTime = 0;
-
     private long tupleCounter = 0;
     private boolean ackEnabled = false;
     private int acksSinceLastCommit = 0;
-    private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
     private Timer commitTimer;
     private boolean fileReadCompletely = true;
 
@@ -107,6 +85,59 @@ public class HdfsSpout extends BaseRichSpout {
     public HdfsSpout() {
     }
 
+    private static String getFileProgress(FileReader reader) {
+        return reader.getFilePath() + " " + reader.getFileOffset();
+    }
+
+    private static void releaseLockAndLog(FileLock fLock, String spoutId) {
+        try {
+            if (fLock != null) {
+                fLock.release();
+                LOG.debug("Spout {} released FileLock. SpoutId = {}", 
fLock.getLockFile(), spoutId);
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to delete lock file : " + fLock.getLockFile() + 
" SpoutId =" + spoutId, e);
+        }
+    }
+
+    private static void validateOrMakeDir(FileSystem fs, Path dir, String 
dirDescription) {
+        try {
+            if (fs.exists(dir)) {
+                if (!fs.isDirectory(dir)) {
+                    LOG.error(dirDescription + " directory is a file, not a 
dir. " + dir);
+                    throw new RuntimeException(dirDescription + " directory is 
a file, not a dir. " + dir);
+                }
+            } else if (!fs.mkdirs(dir)) {
+                LOG.error("Unable to create " + dirDescription + " directory " 
+ dir);
+                throw new RuntimeException("Unable to create " + 
dirDescription + " directory " + dir);
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to create " + dirDescription + " directory " + 
dir, e);
+            throw new RuntimeException("Unable to create " + dirDescription + 
" directory " + dir, e);
+        }
+    }
+
+    static void checkValidReader(String readerType) {
+        if (readerType.equalsIgnoreCase(Configs.TEXT) || 
readerType.equalsIgnoreCase(Configs.SEQ)) {
+            return;
+        }
+        try {
+            Class<?> classType = Class.forName(readerType);
+            classType.getConstructor(FileSystem.class, Path.class, Map.class);
+            if (!FileReader.class.isAssignableFrom(classType)) {
+                LOG.error(readerType + " not a FileReader");
+                throw new IllegalArgumentException(readerType + " not a 
FileReader.");
+            }
+            return;
+        } catch (ClassNotFoundException e) {
+            LOG.error(readerType + " not found in classpath.", e);
+            throw new IllegalArgumentException(readerType + " not found in 
classpath.", e);
+        } catch (NoSuchMethodException e) {
+            LOG.error(readerType + " is missing the expected constructor for 
Readers.", e);
+            throw new IllegalArgumentException(readerType + " is missing the 
expected constuctor for Readers.");
+        }
+    }
+
     public HdfsSpout setHdfsUri(String hdfsUri) {
         this.hdfsUri = hdfsUri;
         return this;
@@ -211,8 +242,8 @@ public class HdfsSpout extends BaseRichSpout {
 
         if (ackEnabled && tracker.size() >= maxOutstanding) {
             LOG.warn("Waiting for more ACKs before generating new tuples. "
-                + "Progress tracker size has reached limit {}, SpoutID {}",
-                 maxOutstanding, spoutId);
+                     + "Progress tracker size has reached limit {}, SpoutID 
{}",
+                     maxOutstanding, spoutId);
             // Don't emit anything .. allow configured spout wait strategy to 
kick in
             return;
         }
@@ -260,7 +291,7 @@ public class HdfsSpout extends BaseRichSpout {
                 return;
             } catch (ParseException e) {
                 LOG.error("Parsing error when processing at file location " + 
getFileProgress(reader)
-                    + ". Skipping remainder of file.", e);
+                          + ". Skipping remainder of file.", e);
                 markFileAsBad(reader.getFilePath());
                 // Note: We don't return from this method on ParseException to 
avoid triggering the
                 // spout wait strategy (due to no emits). Instead we go back 
into the loop and
@@ -301,10 +332,6 @@ public class HdfsSpout extends BaseRichSpout {
         commitTimer.schedule(timerTask, commitFrequencySec * 1000);
     }
 
-    private static String getFileProgress(FileReader reader) {
-        return reader.getFilePath() + " " + reader.getFileOffset();
-    }
-
     private void markFileAsDone(Path filePath) {
         try {
             Path newFile = renameCompletedFile(reader.getFilePath());
@@ -321,7 +348,8 @@ public class HdfsSpout extends BaseRichSpout {
         String originalName = new Path(fileNameMinusSuffix).getName();
         Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + 
originalName);
 
-        LOG.info("Moving bad file {} to {}. Processed it till offset {}. 
SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId);
+        LOG.info("Moving bad file {} to {}. Processed it till offset {}. 
SpoutID= {}", originalName, newFile, tracker.getCommitPosition(),
+                 spoutId);
         try {
             if (!hdfs.rename(file, newFile)) { // seems this can fail by 
returning false or throwing exception
                 throw new IOException("Move failed for bad file: " + file); // 
convert false ret value to exception
@@ -343,17 +371,6 @@ public class HdfsSpout extends BaseRichSpout {
         lock = null;
     }
 
-    private static void releaseLockAndLog(FileLock fLock, String spoutId) {
-        try {
-            if (fLock != null) {
-                fLock.release();
-                LOG.debug("Spout {} released FileLock. SpoutId = {}", 
fLock.getLockFile(), spoutId);
-            }
-        } catch (IOException e) {
-            LOG.error("Unable to delete lock file : " + fLock.getLockFile() + 
" SpoutId =" + spoutId, e);
-        }
-    }
-
     protected void emitData(List<Object> tuple, MessageId id) {
         LOG.trace("Emitting - {}", id);
 
@@ -512,48 +529,10 @@ public class HdfsSpout extends BaseRichSpout {
         this.commitTimer.cancel();
     }
 
-    private static void validateOrMakeDir(FileSystem fs, Path dir, String 
dirDescription) {
-        try {
-            if (fs.exists(dir)) {
-                if (!fs.isDirectory(dir)) {
-                    LOG.error(dirDescription + " directory is a file, not a 
dir. " + dir);
-                    throw new RuntimeException(dirDescription + " directory is 
a file, not a dir. " + dir);
-                }
-            } else if (!fs.mkdirs(dir)) {
-                LOG.error("Unable to create " + dirDescription + " directory " 
+ dir);
-                throw new RuntimeException("Unable to create " + 
dirDescription + " directory " + dir);
-            }
-        } catch (IOException e) {
-            LOG.error("Unable to create " + dirDescription + " directory " + 
dir, e);
-            throw new RuntimeException("Unable to create " + dirDescription + 
" directory " + dir, e);
-        }
-    }
-
     private String getDefaultLockDir(Path sourceDirPath) {
         return sourceDirPath.toString() + Path.SEPARATOR + 
Configs.DEFAULT_LOCK_DIR;
     }
 
-    static void checkValidReader(String readerType) {
-        if (readerType.equalsIgnoreCase(Configs.TEXT) || 
readerType.equalsIgnoreCase(Configs.SEQ)) {
-            return;
-        }
-        try {
-            Class<?> classType = Class.forName(readerType);
-            classType.getConstructor(FileSystem.class, Path.class, Map.class);
-            if (!FileReader.class.isAssignableFrom(classType)) {
-                LOG.error(readerType + " not a FileReader");
-                throw new IllegalArgumentException(readerType + " not a 
FileReader.");
-            }
-            return;
-        } catch (ClassNotFoundException e) {
-            LOG.error(readerType + " not found in classpath.", e);
-            throw new IllegalArgumentException(readerType + " not found in 
classpath.", e);
-        } catch (NoSuchMethodException e) {
-            LOG.error(readerType + " is missing the expected constructor for 
Readers.", e);
-            throw new IllegalArgumentException(readerType + " is missing the 
expected constuctor for Readers.");
-        }
-    }
-
     @Override
     public void ack(Object msgId) {
         LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId);

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
index fdf7751f..a7845ea 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
@@ -1,26 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
 public class ParseException extends Exception {
-  public ParseException(String message, Throwable cause) {
-    super(message, cause);
-  }
+    public ParseException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
index e2e7126..93a9b09 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
@@ -23,48 +17,48 @@ import java.util.TreeSet;
 
 public class ProgressTracker {
 
-  TreeSet<FileOffset> offsets = new TreeSet<>();
+    TreeSet<FileOffset> offsets = new TreeSet<>();
 
-  public synchronized void recordAckedOffset(FileOffset newOffset) {
-    if(newOffset==null) {
-      return;
-    }
-    offsets.add(newOffset);
+    public synchronized void recordAckedOffset(FileOffset newOffset) {
+        if (newOffset == null) {
+            return;
+        }
+        offsets.add(newOffset);
 
-    FileOffset currHead = offsets.first();
+        FileOffset currHead = offsets.first();
 
-    if( currHead.isNextOffset(newOffset) ) { // check is a minor optimization
-      trimHead();
+        if (currHead.isNextOffset(newOffset)) { // check is a minor 
optimization
+            trimHead();
+        }
     }
-  }
 
-  // remove contiguous elements from the head of the heap
-  // e.g.:  1,2,3,4,10,11,12,15  =>  4,10,11,12,15
-  private synchronized void trimHead() {
-    if(offsets.size()<=1) {
-      return;
-    }
-    FileOffset head = offsets.first();
-    FileOffset head2 = offsets.higher(head);
-    if( head.isNextOffset(head2) ) {
-      offsets.pollFirst();
-      trimHead();
+    // remove contiguous elements from the head of the heap
+    // e.g.:  1,2,3,4,10,11,12,15  =>  4,10,11,12,15
+    private synchronized void trimHead() {
+        if (offsets.size() <= 1) {
+            return;
+        }
+        FileOffset head = offsets.first();
+        FileOffset head2 = offsets.higher(head);
+        if (head.isNextOffset(head2)) {
+            offsets.pollFirst();
+            trimHead();
+        }
+        return;
     }
-    return;
-  }
 
-  public synchronized FileOffset getCommitPosition() {
-    if(!offsets.isEmpty()) {
-      return offsets.first().clone();
+    public synchronized FileOffset getCommitPosition() {
+        if (!offsets.isEmpty()) {
+            return offsets.first().clone();
+        }
+        return null;
     }
-    return null;
-  }
 
-  public synchronized void dumpState(PrintStream stream) {
-    stream.println(offsets);
-  }
+    public synchronized void dumpState(PrintStream stream) {
+        stream.println(offsets);
+    }
 
-  public synchronized int size() {
-    return offsets.size();
-  }
+    public synchronized int size() {
+        return offsets.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 64b6b7a..ab61c2b 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -1,23 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -26,188 +25,185 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+public class SequenceFileReader<Key extends Writable, Value extends Writable>
+    extends AbstractFileReader {
+    public static final String[] defaultFields = { "key", "value" };
+    public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
+    private static final Logger LOG = LoggerFactory
+        .getLogger(SequenceFileReader.class);
+    private static final int DEFAULT_BUFF_SIZE = 4096;
+    private final SequenceFile.Reader reader;
 
-public class SequenceFileReader<Key extends Writable,Value extends Writable>
-        extends AbstractFileReader {
-  private static final Logger LOG = LoggerFactory
-          .getLogger(SequenceFileReader.class);
-  public static final String[] defaultFields = {"key", "value"};
-  private static final int DEFAULT_BUFF_SIZE = 4096;
-  public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
-
-  private final SequenceFile.Reader reader;
-
-  private final SequenceFileReader.Offset offset;
-
-
-  private final Key key;
-  private final Value value;
-
-
-  public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf)
-          throws IOException {
-    super(fs, file);
-    int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
-    this.reader = new SequenceFile.Reader(fs.getConf(),  
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
-    this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), 
fs.getConf() );
-    this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), 
fs.getConf() );
-    this.offset = new SequenceFileReader.Offset(0,0,0);
-  }
-
-  public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> 
conf, String offset)
-          throws IOException {
-    super(fs, file);
-    int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
-    this.offset = new SequenceFileReader.Offset(offset);
-    this.reader = new SequenceFile.Reader(fs.getConf(),  
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
-    this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), 
fs.getConf() );
-    this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), 
fs.getConf() );
-    skipToOffset(this.reader, this.offset, this.key);
-  }
-
-  private static <K> void skipToOffset(SequenceFile.Reader reader, Offset 
offset, K key) throws IOException {
-    reader.sync(offset.lastSyncPoint);
-    for(int i=0; i<offset.recordsSinceLastSync; ++i) {
-      reader.next(key);
-    }
-  }
-
-  public List<Object> next() throws IOException, ParseException {
-    if( reader.next(key, value) ) {
-      ArrayList<Object> result = new ArrayList<Object>(2);
-      Collections.addAll(result, key, value);
-      offset.increment(reader.syncSeen(), reader.getPosition() );
-      return result;
-    }
-    return null;
-  }
-
-  @Override
-  public void close() {
-    try {
-      reader.close();
-    } catch (IOException e) {
-      LOG.warn("Ignoring error when closing file " + getFilePath(), e);
-    }
-  }
+    private final SequenceFileReader.Offset offset;
 
-  public Offset getFileOffset() {
-      return offset;
-  }
 
+    private final Key key;
+    private final Value value;
 
-  public static class Offset implements  FileOffset {
-    public long lastSyncPoint;
-    public long recordsSinceLastSync;
-    public long currentRecord;
-    private long currRecordEndOffset;
-    private long prevRecordEndOffset;
 
-    public Offset(long lastSyncPoint, long recordsSinceLastSync, long 
currentRecord) {
-      this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0 );
+    public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> 
conf)
+        throws IOException {
+        super(fs, file);
+        int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt(conf.get(BUFFER_SIZE).toString());
+        this.reader = new SequenceFile.Reader(fs.getConf(), 
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize));
+        this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), 
fs.getConf());
+        this.value = (Value) 
ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
+        this.offset = new SequenceFileReader.Offset(0, 0, 0);
     }
 
-    public Offset(long lastSyncPoint, long recordsSinceLastSync, long 
currentRecord
-                  , long currRecordEndOffset, long prevRecordEndOffset) {
-      this.lastSyncPoint = lastSyncPoint;
-      this.recordsSinceLastSync = recordsSinceLastSync;
-      this.currentRecord = currentRecord;
-      this.prevRecordEndOffset = prevRecordEndOffset;
-      this.currRecordEndOffset = currRecordEndOffset;
+    public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> 
conf, String offset)
+        throws IOException {
+        super(fs, file);
+        int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt(conf.get(BUFFER_SIZE).toString());
+        this.offset = new SequenceFileReader.Offset(offset);
+        this.reader = new SequenceFile.Reader(fs.getConf(), 
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize));
+        this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), 
fs.getConf());
+        this.value = (Value) 
ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
+        skipToOffset(this.reader, this.offset, this.key);
     }
 
-    public Offset(String offset) {
-      try {
-        if(offset==null) {
-          throw new IllegalArgumentException("offset cannot be null");
-        }
-        if(offset.equalsIgnoreCase("0")) {
-          this.lastSyncPoint = 0;
-          this.recordsSinceLastSync = 0;
-          this.currentRecord = 0;
-          this.prevRecordEndOffset = 0;
-          this.currRecordEndOffset = 0;
-        } else {
-          String[] parts = offset.split(":");
-          this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]);
-          this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]);
-          this.currentRecord = Long.parseLong(parts[2].split("=")[1]);
-          this.prevRecordEndOffset = 0;
-          this.currRecordEndOffset = 0;
+    private static <K> void skipToOffset(SequenceFile.Reader reader, Offset 
offset, K key) throws IOException {
+        reader.sync(offset.lastSyncPoint);
+        for (int i = 0; i < offset.recordsSinceLastSync; ++i) {
+            reader.next(key);
         }
-      } catch (Exception e) {
-        throw new IllegalArgumentException("'" + offset +
-                "' cannot be interpreted. It is not in expected format for 
SequenceFileReader." +
-                " Format e.g. {sync=123:afterSync=345:record=67}");
-      }
     }
 
-    @Override
-    public String toString() {
-      return '{' +
-              "sync=" + lastSyncPoint +
-              ":afterSync=" + recordsSinceLastSync +
-              ":record=" + currentRecord +
-              ":}";
+    public List<Object> next() throws IOException, ParseException {
+        if (reader.next(key, value)) {
+            ArrayList<Object> result = new ArrayList<Object>(2);
+            Collections.addAll(result, key, value);
+            offset.increment(reader.syncSeen(), reader.getPosition());
+            return result;
+        }
+        return null;
     }
 
     @Override
-    public boolean isNextOffset(FileOffset rhs) {
-      if(rhs instanceof Offset) {
-        Offset other = ((Offset) rhs);
-        return  other.currentRecord > currentRecord+1;
-      }
-      return false;
+    public void close() {
+        try {
+            reader.close();
+        } catch (IOException e) {
+            LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+        }
     }
 
-    @Override
-    public int compareTo(FileOffset o) {
-      Offset rhs = ((Offset) o);
-      if(currentRecord<rhs.currentRecord) {
-        return -1;
-      }
-      if(currentRecord==rhs.currentRecord) {
-        return 0;
-      }
-      return 1;
+    public Offset getFileOffset() {
+        return offset;
     }
 
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) { return true; }
-      if (!(o instanceof Offset)) { return false; }
 
-      Offset offset = (Offset) o;
+    public static class Offset implements FileOffset {
+        public long lastSyncPoint;
+        public long recordsSinceLastSync;
+        public long currentRecord;
+        private long currRecordEndOffset;
+        private long prevRecordEndOffset;
 
-      return currentRecord == offset.currentRecord;
-    }
+        public Offset(long lastSyncPoint, long recordsSinceLastSync, long 
currentRecord) {
+            this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0);
+        }
 
-    @Override
-    public int hashCode() {
-      return (int) (currentRecord ^ (currentRecord >>> 32));
-    }
-    
-    void increment(boolean syncSeen, long newBytePosition) {
-      if(!syncSeen) {
-        ++recordsSinceLastSync;
-      }  else {
-        recordsSinceLastSync = 1;
-        lastSyncPoint = prevRecordEndOffset;
-      }
-      ++currentRecord;
-      prevRecordEndOffset = currRecordEndOffset;
-      currentRecord = newBytePosition;
-    }
+        public Offset(long lastSyncPoint, long recordsSinceLastSync, long 
currentRecord
+            , long currRecordEndOffset, long prevRecordEndOffset) {
+            this.lastSyncPoint = lastSyncPoint;
+            this.recordsSinceLastSync = recordsSinceLastSync;
+            this.currentRecord = currentRecord;
+            this.prevRecordEndOffset = prevRecordEndOffset;
+            this.currRecordEndOffset = currRecordEndOffset;
+        }
 
-    @Override
-    public Offset clone() {
-      return new Offset(lastSyncPoint, recordsSinceLastSync, currentRecord, 
currRecordEndOffset, prevRecordEndOffset);
-    }
+        public Offset(String offset) {
+            try {
+                if (offset == null) {
+                    throw new IllegalArgumentException("offset cannot be 
null");
+                }
+                if (offset.equalsIgnoreCase("0")) {
+                    this.lastSyncPoint = 0;
+                    this.recordsSinceLastSync = 0;
+                    this.currentRecord = 0;
+                    this.prevRecordEndOffset = 0;
+                    this.currRecordEndOffset = 0;
+                } else {
+                    String[] parts = offset.split(":");
+                    this.lastSyncPoint = 
Long.parseLong(parts[0].split("=")[1]);
+                    this.recordsSinceLastSync = 
Long.parseLong(parts[1].split("=")[1]);
+                    this.currentRecord = 
Long.parseLong(parts[2].split("=")[1]);
+                    this.prevRecordEndOffset = 0;
+                    this.currRecordEndOffset = 0;
+                }
+            } catch (Exception e) {
+                throw new IllegalArgumentException("'" + offset +
+                                                   "' cannot be interpreted. 
It is not in expected format for SequenceFileReader." +
+                                                   " Format e.g. 
{sync=123:afterSync=345:record=67}");
+            }
+        }
+
+        @Override
+        public String toString() {
+            return '{' +
+                   "sync=" + lastSyncPoint +
+                   ":afterSync=" + recordsSinceLastSync +
+                   ":record=" + currentRecord +
+                   ":}";
+        }
+
+        @Override
+        public boolean isNextOffset(FileOffset rhs) {
+            if (rhs instanceof Offset) {
+                Offset other = ((Offset) rhs);
+                return other.currentRecord > currentRecord + 1;
+            }
+            return false;
+        }
+
+        @Override
+        public int compareTo(FileOffset o) {
+            Offset rhs = ((Offset) o);
+            if (currentRecord < rhs.currentRecord) {
+                return -1;
+            }
+            if (currentRecord == rhs.currentRecord) {
+                return 0;
+            }
+            return 1;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof Offset)) {
+                return false;
+            }
+
+            Offset offset = (Offset) o;
+
+            return currentRecord == offset.currentRecord;
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) (currentRecord ^ (currentRecord >>> 32));
+        }
+
+        void increment(boolean syncSeen, long newBytePosition) {
+            if (!syncSeen) {
+                ++recordsSinceLastSync;
+            } else {
+                recordsSinceLastSync = 1;
+                lastSyncPoint = prevRecordEndOffset;
+            }
+            ++currentRecord;
+            prevRecordEndOffset = currRecordEndOffset;
+            currentRecord = newBytePosition;
+        }
+
+        @Override
+        public Offset clone() {
+            return new Offset(lastSyncPoint, recordsSinceLastSync, 
currentRecord, currRecordEndOffset, prevRecordEndOffset);
+        }
 
-  } //class Offset
+    } //class Offset
 } //class

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index a393238..cc5531e 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -1,192 +1,190 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // Todo: Track file offsets instead of line number
 public class TextFileReader extends AbstractFileReader {
-  public static final String[] defaultFields = {"line"};
-  public static final String CHARSET = "hdfsspout.reader.charset";
-  public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
-
-  private static final int DEFAULT_BUFF_SIZE = 4096;
-
-  private BufferedReader reader;
-  private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
-  private TextFileReader.Offset offset;
-
-  public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf) 
throws IOException {
-    this(fs, file, conf, new TextFileReader.Offset(0,0) );
-  }
-
-  public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, 
String startOffset) throws IOException {
-    this(fs, file, conf, new TextFileReader.Offset(startOffset) );
-  }
-
-  private TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, 
TextFileReader.Offset startOffset)
-          throws IOException {
-    super(fs, file);
-    offset = startOffset;
-    FSDataInputStream in = fs.open(file);
-
-    String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : 
conf.get(CHARSET).toString();
-    int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? 
DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
-    reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
-    if(offset.charOffset >0) {
-      reader.skip(offset.charOffset);
-    }
-
-  }
+    public static final String[] defaultFields = { "line" };
+    public static final String CHARSET = "hdfsspout.reader.charset";
+    public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
 
-  public Offset getFileOffset() {
-    return offset.clone();
-  }
+    private static final int DEFAULT_BUFF_SIZE = 4096;
+    private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
+    private BufferedReader reader;
+    private TextFileReader.Offset offset;
 
-  public List<Object> next() throws IOException, ParseException {
-    String line = readLineAndTrackOffset(reader);
-    if(line!=null) {
-      return Collections.singletonList((Object) line);
-    }
-    return null;
-  }
-
-  private String readLineAndTrackOffset(BufferedReader reader) throws 
IOException {
-    StringBuffer sb = new StringBuffer(1000);
-    long before = offset.charOffset;
-    int ch;
-    while( (ch = reader.read()) != -1 ) {
-      ++offset.charOffset;
-      if (ch == '\n') {
-        ++offset.lineNumber;
-        return sb.toString();
-      } else if( ch != '\r') {
-        sb.append((char)ch);
-      }
-    }
-    if(before==offset.charOffset) { // reached EOF, didnt read anything
-      return null;
+    public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf) 
throws IOException {
+        this(fs, file, conf, new TextFileReader.Offset(0, 0));
     }
-    return sb.toString();
-  }
-
-  @Override
-  public void close() {
-    try {
-      reader.close();
-    } catch (IOException e) {
-      LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+
+    public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, 
String startOffset) throws IOException {
+        this(fs, file, conf, new TextFileReader.Offset(startOffset));
     }
-  }
 
-  public static class Offset implements FileOffset {
-    long charOffset;
-    long lineNumber;
+    private TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, 
TextFileReader.Offset startOffset)
+        throws IOException {
+        super(fs, file);
+        offset = startOffset;
+        FSDataInputStream in = fs.open(file);
+
+        String charSet = (conf == null || !conf.containsKey(CHARSET)) ? 
"UTF-8" : conf.get(CHARSET).toString();
+        int buffSz =
+            (conf == null || !conf.containsKey(BUFFER_SIZE)) ? 
DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString());
+        reader = new BufferedReader(new InputStreamReader(in, charSet), 
buffSz);
+        if (offset.charOffset > 0) {
+            reader.skip(offset.charOffset);
+        }
 
-    public Offset(long byteOffset, long lineNumber) {
-      this.charOffset = byteOffset;
-      this.lineNumber = lineNumber;
     }
 
-    public Offset(String offset) {
-      if(offset==null) {
-        throw new IllegalArgumentException("offset cannot be null");
-      }
-      try {
-        if(offset.equalsIgnoreCase("0")) {
-          this.charOffset = 0;
-          this.lineNumber = 0;
-        } else {
-          String[] parts = offset.split(":");
-          this.charOffset = Long.parseLong(parts[0].split("=")[1]);
-          this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
-        }
-      } catch (Exception e) {
-        throw new IllegalArgumentException("'" + offset +
-                "' cannot be interpreted. It is not in expected format for 
TextFileReader." +
-                " Format e.g.  {char=123:line=5}");
-      }
+    public Offset getFileOffset() {
+        return offset.clone();
     }
 
-    @Override
-    public String toString() {
-      return '{' +
-              "char=" + charOffset +
-              ":line=" + lineNumber +
-              ":}";
+    public List<Object> next() throws IOException, ParseException {
+        String line = readLineAndTrackOffset(reader);
+        if (line != null) {
+            return Collections.singletonList((Object) line);
+        }
+        return null;
     }
 
-    @Override
-    public boolean isNextOffset(FileOffset rhs) {
-      if(rhs instanceof Offset) {
-        Offset other = ((Offset) rhs);
-        return  other.charOffset > charOffset &&
-                other.lineNumber == lineNumber+1;
-      }
-      return false;
+    private String readLineAndTrackOffset(BufferedReader reader) throws 
IOException {
+        StringBuffer sb = new StringBuffer(1000);
+        long before = offset.charOffset;
+        int ch;
+        while ((ch = reader.read()) != -1) {
+            ++offset.charOffset;
+            if (ch == '\n') {
+                ++offset.lineNumber;
+                return sb.toString();
+            } else if (ch != '\r') {
+                sb.append((char) ch);
+            }
+        }
+        if (before == offset.charOffset) { // reached EOF, didnt read anything
+            return null;
+        }
+        return sb.toString();
     }
 
     @Override
-    public int compareTo(FileOffset o) {
-      Offset rhs = ((Offset)o);
-      if(lineNumber < rhs.lineNumber) {
-        return -1;
-      }
-      if(lineNumber == rhs.lineNumber) {
-        return 0;
-      }
-      return 1;
+    public void close() {
+        try {
+            reader.close();
+        } catch (IOException e) {
+            LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+        }
     }
 
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) { return true; }
-      if (!(o instanceof Offset)) { return false; }
+    public static class Offset implements FileOffset {
+        long charOffset;
+        long lineNumber;
 
-      Offset that = (Offset) o;
+        public Offset(long byteOffset, long lineNumber) {
+            this.charOffset = byteOffset;
+            this.lineNumber = lineNumber;
+        }
 
-      if (charOffset != that.charOffset)
-        return false;
-      return lineNumber == that.lineNumber;
-    }
+        public Offset(String offset) {
+            if (offset == null) {
+                throw new IllegalArgumentException("offset cannot be null");
+            }
+            try {
+                if (offset.equalsIgnoreCase("0")) {
+                    this.charOffset = 0;
+                    this.lineNumber = 0;
+                } else {
+                    String[] parts = offset.split(":");
+                    this.charOffset = Long.parseLong(parts[0].split("=")[1]);
+                    this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
+                }
+            } catch (Exception e) {
+                throw new IllegalArgumentException("'" + offset +
+                                                   "' cannot be interpreted. 
It is not in expected format for TextFileReader." +
+                                                   " Format e.g.  
{char=123:line=5}");
+            }
+        }
 
-    @Override
-    public int hashCode() {
-      int result = (int) (charOffset ^ (charOffset >>> 32));
-      result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32));
-      return result;
-    }
+        @Override
+        public String toString() {
+            return '{' +
+                   "char=" + charOffset +
+                   ":line=" + lineNumber +
+                   ":}";
+        }
 
-    @Override
-    public Offset clone() {
-      return new Offset(charOffset, lineNumber);
-    }
-  } //class Offset
+        @Override
+        public boolean isNextOffset(FileOffset rhs) {
+            if (rhs instanceof Offset) {
+                Offset other = ((Offset) rhs);
+                return other.charOffset > charOffset &&
+                       other.lineNumber == lineNumber + 1;
+            }
+            return false;
+        }
+
+        @Override
+        public int compareTo(FileOffset o) {
+            Offset rhs = ((Offset) o);
+            if (lineNumber < rhs.lineNumber) {
+                return -1;
+            }
+            if (lineNumber == rhs.lineNumber) {
+                return 0;
+            }
+            return 1;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof Offset)) {
+                return false;
+            }
+
+            Offset that = (Offset) o;
+
+            if (charOffset != that.charOffset) {
+                return false;
+            }
+            return lineNumber == that.lineNumber;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (charOffset ^ (charOffset >>> 32));
+            result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32));
+            return result;
+        }
+
+        @Override
+        public Offset clone() {
+            return new Offset(charOffset, lineNumber);
+        }
+    } //class Offset
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 07968f2..e6adfbb 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -15,8 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -43,32 +55,154 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
 public class HdfsState implements State {
 
+    public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
+    private Options options;
+    private volatile TxnRecord lastSeenTxn;
+    private Path indexFilePath;
+
+
+    HdfsState(Options options) {
+        this.options = options;
+    }
+
+    void prepare(Map<String, Object> conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
+        this.options.prepare(conf, partitionIndex, numPartitions);
+        initLastTxn(conf, partitionIndex);
+    }
+
+    private TxnRecord readTxnRecord(Path path) throws IOException {
+        FSDataInputStream inputStream = null;
+        try {
+            inputStream = this.options.fs.open(path);
+            BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream));
+            String line;
+            if ((line = reader.readLine()) != null) {
+                String[] fields = line.split(",");
+                return new TxnRecord(Long.valueOf(fields[0]), fields[1], 
Long.valueOf(fields[2]));
+            }
+        } finally {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+        }
+        return new TxnRecord(0, options.currentFile.toString(), 0);
+    }
+
+    /**
+     * Returns temp file path corresponding to a file name.
+     */
+    private Path tmpFilePath(String filename) {
+        return new Path(filename + ".tmp");
+    }
+
+    /**
+     * Reads the last txn record from index file if it exists, if not from 
.tmp file if exists.
+     *
+     * @param indexFilePath the index file path
+     * @return the txn record from the index file or a default initial record.
+     *
+     * @throws IOException
+     */
+    private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
+        Path tmpPath = tmpFilePath(indexFilePath.toString());
+        if (this.options.fs.exists(indexFilePath)) {
+            return readTxnRecord(indexFilePath);
+        } else if (this.options.fs.exists(tmpPath)) {
+            return readTxnRecord(tmpPath);
+        }
+        return new TxnRecord(0, options.currentFile.toString(), 0);
+    }
+
+    private void initLastTxn(Map<String, Object> conf, int partition) {
+        // include partition id in the file name so that index for different 
partitions are independent.
+        String indexFileName = String.format(".index.%s.%d", 
conf.get(Config.TOPOLOGY_NAME), partition);
+        this.indexFilePath = new Path(options.fileNameFormat.getPath(), 
indexFileName);
+        try {
+            this.lastSeenTxn = getTxnRecord(indexFilePath);
+            LOG.debug("initLastTxn updated lastSeenTxn to [{}]", 
this.lastSeenTxn);
+        } catch (IOException e) {
+            LOG.warn("initLastTxn failed due to IOException.", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void updateIndex(long txId) {
+        LOG.debug("Starting index update.");
+        final Path tmpPath = tmpFilePath(indexFilePath.toString());
+
+        try (FSDataOutputStream out = this.options.fs.create(tmpPath, true);
+             BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out))) {
+            TxnRecord txnRecord = new TxnRecord(txId, 
options.currentFile.toString(), this.options.getCurrentOffset());
+            bw.write(txnRecord.toString());
+            bw.newLine();
+            bw.flush();
+            out.close();       /* In non error scenarios, for the Azure Data 
Lake Store File System (adl://),
+                               the output stream must be closed before the 
file associated with it is deleted.
+                               For ADLFS deleting the file also removes any 
handles to the file, hence out.close() will fail. */
+            /*
+             * Delete the current index file and rename the tmp file to 
atomically
+             * replace the index file. Orphan .tmp files are handled in 
getTxnRecord.
+             */
+            options.fs.delete(this.indexFilePath, false);
+            options.fs.rename(tmpPath, this.indexFilePath);
+            lastSeenTxn = txnRecord;
+            LOG.debug("updateIndex updated lastSeenTxn to [{}]", 
this.lastSeenTxn);
+        } catch (IOException e) {
+            LOG.warn("Begin commit failed due to IOException. Failing batch", 
e);
+            throw new FailedException(e);
+        }
+    }
+
+    @Override
+    public void beginCommit(Long txId) {
+        if (txId <= lastSeenTxn.txnid) {
+            LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering 
recovery.", txId, lastSeenTxn);
+            long start = System.currentTimeMillis();
+            options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+            LOG.info("Recovery took {} ms.", System.currentTimeMillis() - 
start);
+        }
+        updateIndex(txId);
+    }
+
+    @Override
+    public void commit(Long txId) {
+        try {
+            options.doCommit(txId);
+        } catch (IOException e) {
+            LOG.warn("Commit failed due to IOException. Failing the batch.", 
e);
+            throw new FailedException(e);
+        }
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector 
tridentCollector) {
+        try {
+            this.options.execute(tuples);
+        } catch (IOException e) {
+            LOG.warn("Failing batch due to IOException.", e);
+            throw new FailedException(e);
+        }
+    }
+
+    /**
+     * for unit tests
+     */
+    void close() throws IOException {
+        this.options.closeOutputFile();
+    }
+
     public static abstract class Options implements Serializable {
 
         protected String fsUrl;
         protected String configKey;
         protected transient FileSystem fs;
-        private Path currentFile;
         protected FileRotationPolicy rotationPolicy;
         protected FileNameFormat fileNameFormat;
         protected int rotation = 0;
         protected transient Configuration hdfsConfig;
         protected ArrayList<RotationAction> rotationActions = new 
ArrayList<RotationAction>();
-
+        private Path currentFile;
 
         abstract void closeOutputFile() throws IOException;
 
@@ -78,7 +212,7 @@ public class HdfsState implements State {
 
         abstract void doPrepare(Map<String, Object> conf, int partitionIndex, 
int numPartitions) throws IOException;
 
-        abstract long getCurrentOffset() throws  IOException;
+        abstract long getCurrentOffset() throws IOException;
 
         abstract void doCommit(Long txId) throws IOException;
 
@@ -143,8 +277,7 @@ public class HdfsState implements State {
         }
 
         /**
-         * Recovers nBytes from srcFile to the new file created
-         * by calling rotateOutputFile and then deletes the srcFile.
+         * Recovers nBytes from srcFile to the new file created by calling 
rotateOutputFile and then deletes the srcFile.
          */
         private void recover(String srcFile, long nBytes) {
             try {
@@ -169,10 +302,10 @@ public class HdfsState implements State {
 
     public static class HdfsFileOptions extends Options {
 
-        private transient FSDataOutputStream out;
         protected RecordFormat format;
+        private transient FSDataOutputStream out;
         private long offset = 0;
-        private int bufferSize =  131072; // default 128 K
+        private int bufferSize = 131072; // default 128 K
 
         public HdfsFileOptions withFsUrl(String fsUrl) {
             this.fsUrl = fsUrl;
@@ -360,24 +493,25 @@ public class HdfsState implements State {
         @Override
         void doRecover(Path srcPath, long nBytes) throws Exception {
             SequenceFile.Reader reader = new 
SequenceFile.Reader(this.hdfsConfig,
-                    SequenceFile.Reader.file(srcPath), 
SequenceFile.Reader.length(nBytes));
+                                                                 
SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes));
 
             Writable key = (Writable) this.format.keyClass().newInstance();
             Writable value = (Writable) this.format.valueClass().newInstance();
-            while(reader.next(key, value)) {
+            while (reader.next(key, value)) {
                 this.writer.append(key, value);
             }
         }
 
         @Override
         Path createOutputFile() throws IOException {
-            Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), 
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+            Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(),
+                              this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
             this.writer = SequenceFile.createWriter(
-                    this.hdfsConfig,
-                    SequenceFile.Writer.file(p),
-                    SequenceFile.Writer.keyClass(this.format.keyClass()),
-                    SequenceFile.Writer.valueClass(this.format.valueClass()),
-                    SequenceFile.Writer.compression(this.compressionType, 
this.codecFactory.getCodecByName(this.compressionCodec))
+                this.hdfsConfig,
+                SequenceFile.Writer.file(p),
+                SequenceFile.Writer.keyClass(this.format.keyClass()),
+                SequenceFile.Writer.valueClass(this.format.valueClass()),
+                SequenceFile.Writer.compression(this.compressionType, 
this.codecFactory.getCodecByName(this.compressionCodec))
             );
             return p;
         }
@@ -418,138 +552,4 @@ public class HdfsState implements State {
             return Long.toString(txnid) + "," + dataFilePath + "," + 
Long.toString(offset);
         }
     }
-
-
-    public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
-    private Options options;
-    private volatile TxnRecord lastSeenTxn;
-    private Path indexFilePath;
-
-    HdfsState(Options options) {
-        this.options = options;
-    }
-
-    void prepare(Map<String, Object> conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
-        this.options.prepare(conf, partitionIndex, numPartitions);
-        initLastTxn(conf, partitionIndex);
-    }
-
-    private TxnRecord readTxnRecord(Path path) throws IOException {
-        FSDataInputStream inputStream = null;
-        try {
-            inputStream = this.options.fs.open(path);
-            BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream));
-            String line;
-            if ((line = reader.readLine()) != null) {
-                String[] fields = line.split(",");
-                return new TxnRecord(Long.valueOf(fields[0]), fields[1], 
Long.valueOf(fields[2]));
-            }
-        } finally {
-            if (inputStream != null) {
-                inputStream.close();
-            }
-        }
-        return new TxnRecord(0, options.currentFile.toString(), 0);
-    }
-
-    /**
-     * Returns temp file path corresponding to a file name.
-     */
-    private Path tmpFilePath(String filename) {
-        return new Path(filename + ".tmp");
-    }
-    /**
-     * Reads the last txn record from index file if it exists, if not
-     * from .tmp file if exists.
-     *
-     * @param indexFilePath the index file path
-     * @return the txn record from the index file or a default initial record.
-     * @throws IOException
-     */
-    private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
-        Path tmpPath = tmpFilePath(indexFilePath.toString());
-        if (this.options.fs.exists(indexFilePath)) {
-            return readTxnRecord(indexFilePath);
-        } else if (this.options.fs.exists(tmpPath)) {
-            return readTxnRecord(tmpPath);
-        }
-        return new TxnRecord(0, options.currentFile.toString(), 0);
-    }
-
-    private void initLastTxn(Map<String, Object> conf, int partition) {
-        // include partition id in the file name so that index for different 
partitions are independent.
-        String indexFileName = String.format(".index.%s.%d", 
conf.get(Config.TOPOLOGY_NAME), partition);
-        this.indexFilePath = new Path(options.fileNameFormat.getPath(), 
indexFileName);
-        try {
-            this.lastSeenTxn = getTxnRecord(indexFilePath);
-            LOG.debug("initLastTxn updated lastSeenTxn to [{}]", 
this.lastSeenTxn);
-        } catch (IOException e) {
-            LOG.warn("initLastTxn failed due to IOException.", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void updateIndex(long txId) {
-        LOG.debug("Starting index update.");
-        final Path tmpPath = tmpFilePath(indexFilePath.toString());
-
-        try (FSDataOutputStream out = this.options.fs.create(tmpPath, true);
-                BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out))) {
-            TxnRecord txnRecord = new TxnRecord(txId, 
options.currentFile.toString(), this.options.getCurrentOffset());
-            bw.write(txnRecord.toString());
-            bw.newLine();
-            bw.flush();
-            out.close();       /* In non error scenarios, for the Azure Data 
Lake Store File System (adl://),
-                               the output stream must be closed before the 
file associated with it is deleted.
-                               For ADLFS deleting the file also removes any 
handles to the file, hence out.close() will fail. */
-            /*
-             * Delete the current index file and rename the tmp file to 
atomically
-             * replace the index file. Orphan .tmp files are handled in 
getTxnRecord.
-             */
-            options.fs.delete(this.indexFilePath, false);
-            options.fs.rename(tmpPath, this.indexFilePath);
-            lastSeenTxn = txnRecord;
-            LOG.debug("updateIndex updated lastSeenTxn to [{}]", 
this.lastSeenTxn);
-        } catch (IOException e) {
-            LOG.warn("Begin commit failed due to IOException. Failing batch", 
e);
-            throw new FailedException(e);
-        }
-    }
-
-    @Override
-    public void beginCommit(Long txId) {
-        if (txId <= lastSeenTxn.txnid) {
-            LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering 
recovery.", txId, lastSeenTxn);
-            long start = System.currentTimeMillis();
-            options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
-            LOG.info("Recovery took {} ms.", System.currentTimeMillis() - 
start);
-        }
-        updateIndex(txId);
-    }
-
-    @Override
-    public void commit(Long txId) {
-        try {
-            options.doCommit(txId);
-        } catch (IOException e) {
-            LOG.warn("Commit failed due to IOException. Failing the batch.", 
e);
-            throw new FailedException(e);
-        }
-    }
-
-    public void updateState(List<TridentTuple> tuples, TridentCollector 
tridentCollector) {
-        try {
-            this.options.execute(tuples);
-        } catch (IOException e) {
-            LOG.warn("Failing batch due to IOException.", e);
-            throw new FailedException(e);
-        }
-    }
-
-    /**
-     * for unit tests
-     */
-    void close() throws IOException {
-        this.options.closeOutputFile();
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
index e76ec22..568f8bc 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
@@ -15,23 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident;
 
+import java.util.Map;
 import org.apache.storm.task.IMetricsContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HdfsStateFactory implements StateFactory {
     private static final Logger LOG = 
LoggerFactory.getLogger(HdfsStateFactory.class);
     private HdfsState.Options options;
 
-    public HdfsStateFactory(){}
+    public HdfsStateFactory() {}
 
-    public HdfsStateFactory withOptions(HdfsState.Options options){
+    public HdfsStateFactory withOptions(HdfsState.Options options) {
         this.options = options;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java
index c603334..a63bb40 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident;
 
+import java.util.List;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseStateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.util.List;
-
-public class HdfsUpdater extends BaseStateUpdater<HdfsState>{
+public class HdfsUpdater extends BaseStateUpdater<HdfsState> {
     @Override
     public void updateState(HdfsState state, List<TridentTuple> tuples, 
TridentCollector collector) {
         state.updateState(tuples, collector);

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
index a952b36..825a0f0 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
 import java.util.Map;
@@ -45,7 +40,7 @@ public class DefaultFileNameFormat implements FileNameFormat {
      * @param prefix
      * @return
      */
-    public DefaultFileNameFormat withPrefix(String prefix){
+    public DefaultFileNameFormat withPrefix(String prefix) {
         this.prefix = prefix;
         return this;
     }
@@ -56,12 +51,12 @@ public class DefaultFileNameFormat implements 
FileNameFormat {
      * @param extension
      * @return
      */
-    public DefaultFileNameFormat withExtension(String extension){
+    public DefaultFileNameFormat withExtension(String extension) {
         this.extension = extension;
         return this;
     }
 
-    public DefaultFileNameFormat withPath(String path){
+    public DefaultFileNameFormat withPath(String path) {
         this.path = path;
         return this;
     }
@@ -74,10 +69,10 @@ public class DefaultFileNameFormat implements 
FileNameFormat {
 
     @Override
     public String getName(long rotation, long timeStamp) {
-        return this.prefix + "-" + this.partitionIndex +  "-" + rotation + "-" 
+ timeStamp + this.extension;
+        return this.prefix + "-" + this.partitionIndex + "-" + rotation + "-" 
+ timeStamp + this.extension;
     }
 
-    public String getPath(){
+    public String getPath() {
         return this.path;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java
index 1336144..f33c030 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
 import org.apache.hadoop.io.LongWritable;
@@ -25,7 +26,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
 /**
  * Basic <code>SequenceFormat</code> implementation that uses
  * <code>LongWritable</code> for keys and <code>Text</code> for values.
- *
  */
 public class DefaultSequenceFormat implements SequenceFormat {
     private transient LongWritable key;
@@ -34,13 +34,12 @@ public class DefaultSequenceFormat implements 
SequenceFormat {
     private String keyField;
     private String valueField;
 
-    public DefaultSequenceFormat(String keyField, String valueField){
+    public DefaultSequenceFormat(String keyField, String valueField) {
         this.keyField = keyField;
         this.valueField = valueField;
     }
 
 
-
     @Override
     public Class keyClass() {
         return LongWritable.class;
@@ -53,8 +52,8 @@ public class DefaultSequenceFormat implements SequenceFormat {
 
     @Override
     public Writable key(TridentTuple tuple) {
-        if(this.key == null){
-            this.key  = new LongWritable();
+        if (this.key == null) {
+            this.key = new LongWritable();
         }
         this.key.set(tuple.getLongByField(this.keyField));
         return this.key;
@@ -62,7 +61,7 @@ public class DefaultSequenceFormat implements SequenceFormat {
 
     @Override
     public Writable value(TridentTuple tuple) {
-        if(this.value == null){
+        if (this.value == null) {
             this.value = new Text();
         }
         this.value.set(tuple.getStringByField(this.valueField));

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
index a08664d..e21fede 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
@@ -1,24 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
 
 /**
  * RecordFormat implementation that uses field and record delimiters.
@@ -40,7 +35,7 @@ public class DelimitedRecordFormat implements RecordFormat {
      * @param fields
      * @return
      */
-    public DelimitedRecordFormat withFields(Fields fields){
+    public DelimitedRecordFormat withFields(Fields fields) {
         this.fields = fields;
         return this;
     }
@@ -51,7 +46,7 @@ public class DelimitedRecordFormat implements RecordFormat {
      * @param delimiter
      * @return
      */
-    public DelimitedRecordFormat withFieldDelimiter(String delimiter){
+    public DelimitedRecordFormat withFieldDelimiter(String delimiter) {
         this.fieldDelimiter = delimiter;
         return this;
     }
@@ -62,7 +57,7 @@ public class DelimitedRecordFormat implements RecordFormat {
      * @param delimiter
      * @return
      */
-    public DelimitedRecordFormat withRecordDelimiter(String delimiter){
+    public DelimitedRecordFormat withRecordDelimiter(String delimiter) {
         this.recordDelimiter = delimiter;
         return this;
     }
@@ -71,9 +66,9 @@ public class DelimitedRecordFormat implements RecordFormat {
     public byte[] format(TridentTuple tuple) {
         StringBuilder sb = new StringBuilder();
         int size = this.fields.size();
-        for(int i = 0; i < size; i++){
+        for (int i = 0; i < size; i++) {
             sb.append(tuple.getValueByField(fields.get(i)));
-            if(i != size - 1){
+            if (i != size - 1) {
                 sb.append(this.fieldDelimiter);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
index c5b0698..fbd8f5a 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
index 76179d9..1cc5363 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
@@ -1,26 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.hdfs.trident.format;
 
+package org.apache.storm.hdfs.trident.format;
 
-import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.io.Serializable;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 /**
  * Formats a Tuple object into a byte array

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
index b4d6c5c..497d045 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
@@ -15,16 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
+import java.io.Serializable;
 import org.apache.hadoop.io.Writable;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.io.Serializable;
-
 /**
  * Interface for converting <code>TridentTuple</code> objects to HDFS sequence 
file key-value pairs.
- *
  */
 public interface SequenceFormat extends Serializable {
     /**
@@ -36,6 +35,7 @@ public interface SequenceFormat extends Serializable {
 
     /**
      * Value class used by implementation (e.g. Text.class, etc.)
+     *
      * @return
      */
     Class valueClass();
@@ -50,6 +50,7 @@ public interface SequenceFormat extends Serializable {
 
     /**
      * Given a tuple, return the value that should be written to the sequence 
file.
+     *
      * @param tuple
      * @return
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
index c676324..068390f 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map;
-
 import org.apache.storm.utils.Utils;
 
 public class SimpleFileNameFormat implements FileNameFormat {
@@ -39,10 +33,10 @@ public class SimpleFileNameFormat implements FileNameFormat 
{
         // compile parameters
         SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormat);
         String ret = name
-                .replace("$TIME", dateFormat.format(new Date(timeStamp)))
-                .replace("$NUM", String.valueOf(rotation))
-                .replace("$HOST", host)
-                .replace("$PARTITION", String.valueOf(partitionIndex));
+            .replace("$TIME", dateFormat.format(new Date(timeStamp)))
+            .replace("$NUM", String.valueOf(rotation))
+            .replace("$HOST", host)
+            .replace("$PARTITION", String.valueOf(partitionIndex));
         return ret;
     }
 
@@ -73,7 +67,7 @@ public class SimpleFileNameFormat implements FileNameFormat {
      * $NUM - rotation number<br/>
      * $HOST - local host name<br/>
      * $PARTITION - partition index<br/>
-     * 
+     *
      * @param name
      *            file name
      * @return
@@ -85,10 +79,10 @@ public class SimpleFileNameFormat implements FileNameFormat 
{
 
     public SimpleFileNameFormat withTimeFormat(String timeFormat) {
         //check format
-        try{
+        try {
             new SimpleDateFormat(timeFormat);
-        }catch (Exception e) {
-            throw new IllegalArgumentException("invalid timeFormat: 
"+e.getMessage());
+        } catch (Exception e) {
+            throw new IllegalArgumentException("invalid timeFormat: " + 
e.getMessage());
         }
         this.timeFormat = timeFormat;
         return this;

Reply via email to