STORM-2810: Fix resource leaks in storm-hdfs tests

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a0308efd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a0308efd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a0308efd

Branch: refs/heads/1.1.x-branch
Commit: a0308efd6c934416206183561194f3470e415edb
Parents: ed005ab
Author: Stig Rohde Døssing <[email protected]>
Authored: Sat Nov 11 18:23:00 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Tue Nov 14 07:53:33 2017 +0100

----------------------------------------------------------------------
 .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java |   14 +-
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |   28 +-
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 1484 +++++++++---------
 .../hdfs/avro/TestFixedAvroSerializer.java      |   13 +-
 .../hdfs/avro/TestGenericAvroSerializer.java    |   11 +-
 .../storm/hdfs/blobstore/BlobStoreTest.java     |  897 ++++++-----
 .../hdfs/blobstore/HdfsBlobStoreImplTest.java   |  288 ++--
 .../hdfs/bolt/AvroGenericRecordBoltTest.java    |  142 +-
 .../apache/storm/hdfs/bolt/TestHdfsBolt.java    |  108 +-
 .../storm/hdfs/bolt/TestSequenceFileBolt.java   |   71 +-
 .../apache/storm/hdfs/spout/TestDirLock.java    |  281 ++--
 .../apache/storm/hdfs/spout/TestFileLock.java   |  679 ++++----
 .../storm/hdfs/spout/TestHdfsSemantics.java     |  311 ++--
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 1235 +++++++--------
 .../storm/hdfs/spout/TestProgressTracker.java   |  179 +--
 .../storm/hdfs/testing/MiniDFSClusterRule.java  |   78 +
 16 files changed, 2945 insertions(+), 2874 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
index a4c88ce..e90e2d3 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -45,9 +45,10 @@ public class HdfsBlobStoreImpl {
 
     private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
     private static final int BUCKETS = 1024;
-    private static final Timer timer = new Timer("HdfsBlobStore cleanup 
thread", true);
     private static final String BLOBSTORE_DATA = "data";
 
+    private Timer timer;
+
     public class KeyInHashDirIterator implements Iterator<String> {
         private int currentBucket = 0;
         private Iterator<String> it = null;
@@ -112,7 +113,6 @@ public class HdfsBlobStoreImpl {
 
     private Path _fullPath;
     private FileSystem _fs;
-    private TimerTask _cleanup = null;
     private Configuration _hadoopConf;
 
     // blobstore directory is private!
@@ -141,7 +141,7 @@ public class HdfsBlobStoreImpl {
         Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
         if (Utils.getBoolean(shouldCleanup, false)) {
             LOG.debug("Starting hdfs blobstore cleaner");
-            _cleanup = new TimerTask() {
+            TimerTask cleanup = new TimerTask() {
                 @Override
                 public void run() {
                     try {
@@ -151,7 +151,8 @@ public class HdfsBlobStoreImpl {
                     }
                 }
             };
-            timer.scheduleAtFixedRate(_cleanup, 0, FULL_CLEANUP_FREQ);
+            timer = new Timer("HdfsBlobStore cleanup thread", true);
+            timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
         }
     }
 
@@ -304,9 +305,8 @@ public class HdfsBlobStoreImpl {
     }
 
     public void shutdown() {
-        if (_cleanup != null) {
-            _cleanup.cancel();
-            _cleanup = null;
+        if (timer != null) {
+            timer.cancel();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 12f835c..7a2402c 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -49,6 +50,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 
 public abstract class AbstractHdfsBolt extends BaseRichBolt {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractHdfsBolt.class);
     private static final Integer DEFAULT_RETRY_COUNT = 3;
     /**
@@ -95,14 +97,19 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
 
     /**
      * Marked as final to prevent override. Subclasses should implement the 
doPrepare() method.
+     *
      * @param conf
      * @param topologyContext
      * @param collector
      */
     public final void prepare(Map conf, TopologyContext topologyContext, 
OutputCollector collector){
         this.writeLock = new Object();
-        if (this.syncPolicy == null) throw new 
IllegalStateException("SyncPolicy must be specified.");
-        if (this.rotationPolicy == null) throw new 
IllegalStateException("RotationPolicy must be specified.");
+        if (this.syncPolicy == null) {
+            throw new IllegalStateException("SyncPolicy must be specified.");
+        }
+        if (this.rotationPolicy == null) {
+            throw new IllegalStateException("RotationPolicy must be 
specified.");
+        }
         if (this.fsUrl == null) {
             throw new IllegalStateException("File system URL must be 
specified.");
         }
@@ -215,10 +222,8 @@ public abstract class AbstractHdfsBolt extends 
BaseRichBolt {
     }
 
     /**
-     * A tuple must be mapped to a writer based on two factors:
-     *  - bolt specific logic that must separate tuples into different files 
in the same directory (see the avro bolt
-     *    for an example of this)
-     *  - the directory the tuple will be partioned into
+     * A tuple must be mapped to a writer based on two factors: - bolt 
specific logic that must separate tuples into different files in the
+     * same directory (see the avro bolt for an example of this) - the 
directory the tuple will be partioned into
      *
      * @param tuple
      * @return
@@ -252,6 +257,11 @@ public abstract class AbstractHdfsBolt extends 
BaseRichBolt {
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 
{
     }
 
+    @Override
+    public void cleanup() {
+        this.rotationTimer.cancel();
+    }
+
     private void syncAllWriters() throws IOException {
         for (AbstractHDFSWriter writer : writers.values()) {
             writer.sync();
@@ -281,8 +291,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
 
         final String partitionPath = this.partitioner.getPartitionPath(tuple);
         final int rotation;
-        if (rotationCounterMap.containsKey(partitionPath))
-        {
+        if (rotationCounterMap.containsKey(partitionPath)) {
             rotation = rotationCounterMap.get(partitionPath) + 1;
         } else {
             rotation = 0;
@@ -300,6 +309,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
     abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple 
tuple) throws IOException;
 
     static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
+
         final long maxWriters;
 
         public WritersMap(long maxWriters) {
@@ -312,4 +322,4 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
             return this.size() > this.maxWriters;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index b7627f2..38a791b 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -9,13 +10,10 @@
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-
 package org.apache.storm.hdfs.spout;
 
 import java.io.IOException;
@@ -47,800 +45,810 @@ import org.apache.storm.tuple.Fields;
 
 public class HdfsSpout extends BaseRichSpout {
 
-  // user configurable
-  private String hdfsUri;            // required
-  private String readerType;         // required
-  private Fields outputFields;       // required
+    // user configurable
+    private String hdfsUri;            // required
+    private String readerType;         // required
+    private Fields outputFields;       // required
 
-  private String sourceDir;        // required
-  private Path sourceDirPath;        // required
+    private String sourceDir;        // required
+    private Path sourceDirPath;        // required
 
-  private String archiveDir;       // required
-  private Path archiveDirPath;       // required
+    private String archiveDir;       // required
+    private Path archiveDirPath;       // required
 
-  private String badFilesDir;      // required
-  private Path badFilesDirPath;      // required
+    private String badFilesDir;      // required
+    private Path badFilesDirPath;      // required
 
-  private String lockDir;
-  private Path lockDirPath;
+    private String lockDir;
+    private Path lockDirPath;
 
-  private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
-  private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC;
-  private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
-  private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
-  private boolean clocksInSync = true;
+    private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
+    private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC;
+    private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
+    private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
+    private boolean clocksInSync = true;
 
-  private String inprogress_suffix = ".inprogress"; // not configurable to 
prevent change between topology restarts
-  private String ignoreSuffix = ".ignore";
+    private String inprogress_suffix = ".inprogress"; // not configurable to 
prevent change between topology restarts
+    private String ignoreSuffix = ".ignore";
 
-  private String outputStreamName= null;
+    private String outputStreamName = null;
 
-  // other members
-  private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
+    // other members
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
 
-  private ProgressTracker tracker = null;
+    private ProgressTracker tracker = null;
 
-  private FileSystem hdfs;
-  private FileReader reader;
+    private FileSystem hdfs;
+    private FileReader reader;
 
-  private SpoutOutputCollector collector;
-  HashMap<MessageId, List<Object> > inflight = new HashMap<>();
-  LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new 
LinkedBlockingQueue<>();
-
-  private Configuration hdfsConfig;
-
-  private Map conf = null;
-  private FileLock lock;
-  private String spoutId = null;
-
-  HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null;
-  private long lastExpiredLockTime = 0;
-
-  private long tupleCounter = 0;
-  private boolean ackEnabled = false;
-  private int acksSinceLastCommit = 0 ;
-  private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
-  private Timer commitTimer;
-  private boolean fileReadCompletely = true;
-
-  private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs 
Kerberos configs
-
-  public HdfsSpout() {
-  }
-
-  public HdfsSpout setHdfsUri(String hdfsUri) {
-    this.hdfsUri = hdfsUri;
-    return this;
-  }
-
-  public HdfsSpout setReaderType(String readerType) {
-    this.readerType = readerType;
-    return this;
-  }
-
-  public HdfsSpout setSourceDir(String sourceDir) {
-    this.sourceDir = sourceDir;
-    return this;
-  }
-
-  public HdfsSpout setArchiveDir(String archiveDir) {
-    this.archiveDir = archiveDir;
-    return this;
-  }
-
-  public HdfsSpout setBadFilesDir(String badFilesDir) {
-    this.badFilesDir = badFilesDir;
-    return this;
-  }
-
-  public HdfsSpout setLockDir(String lockDir) {
-    this.lockDir = lockDir;
-    return this;
-  }
-
-  public HdfsSpout setCommitFrequencyCount(int commitFrequencyCount) {
-    this.commitFrequencyCount = commitFrequencyCount;
-    return this;
-  }
-
-  public HdfsSpout setCommitFrequencySec(int commitFrequencySec) {
-    this.commitFrequencySec = commitFrequencySec;
-    return this;
-  }
-
-  public HdfsSpout setMaxOutstanding(int maxOutstanding) {
-    this.maxOutstanding = maxOutstanding;
-    return this;
-  }
-
-  public HdfsSpout setLockTimeoutSec(int lockTimeoutSec) {
-    this.lockTimeoutSec = lockTimeoutSec;
-    return this;
-  }
-
-  public HdfsSpout setClocksInSync(boolean clocksInSync) {
-    this.clocksInSync = clocksInSync;
-    return this;
-  }
-
-
-  public HdfsSpout setIgnoreSuffix(String ignoreSuffix) {
-    this.ignoreSuffix = ignoreSuffix;
-    return this;
-  }
-
-  /** Output field names. Number of fields depends upon the reader type */
-  public HdfsSpout withOutputFields(String... fields) {
-    outputFields = new Fields(fields);
-    return this;
-  }
-
-  /** set key name under which HDFS options are placed. (similar to HDFS bolt).
-   * default key name is 'hdfs.config' */
-  public HdfsSpout withConfigKey(String configKey) {
-    this.configKey = configKey;
-    return this;
-  }
-
-  /**
-   * Set output stream name
-   */
-  public HdfsSpout withOutputStream(String streamName) {
-    this.outputStreamName = streamName;
-    return this;
-  }
-
-  public Path getLockDirPath() {
-    return lockDirPath;
-  }
-
-  public SpoutOutputCollector getCollector() {
-    return collector;
-  }
-
-  public void nextTuple() {
-    LOG.trace("Next Tuple {}", spoutId);
-    // 1) First re-emit any previously failed tuples (from retryList)
-    if (!retryList.isEmpty()) {
-      LOG.debug("Sending tuple from retry list");
-      HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove();
-      emitData(pair.getValue(), pair.getKey());
-      return;
-    }
-
-    if ( ackEnabled  &&  tracker.size()>= maxOutstanding ) {
-      LOG.warn("Waiting for more ACKs before generating new tuples. " +
-              "Progress tracker size has reached limit {}, SpoutID {}"
-              , maxOutstanding, spoutId);
-      // Don't emit anything .. allow configured spout wait strategy to kick in
-      return;
-    }
-
-    // 2) If no failed tuples to be retried, then send tuples from hdfs
-    while (true) {
-      try {
-        // 3) Select a new file if one is not open already
-        if (reader == null) {
-          reader = pickNextFile();
-          if (reader == null) {
-            LOG.debug("Currently no new files to process under : " + 
sourceDirPath);
-            return;
-          } else {
-            fileReadCompletely=false;
-          }
-        }
-        if ( fileReadCompletely ) { // wait for more ACKs before proceeding
-          return;
-        }
-        // 4) Read record from file, emit to collector and record progress
-        List<Object> tuple = reader.next();
-        if (tuple != null) {
-          fileReadCompletely= false;
-          ++tupleCounter;
-          MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), 
reader.getFileOffset());
-          emitData(tuple, msgId);
-
-          if ( !ackEnabled ) {
-            ++acksSinceLastCommit; // assume message is immediately ACKed in 
non-ack mode
-            commitProgress(reader.getFileOffset());
-          } else {
-            commitProgress(tracker.getCommitPosition());
-          }
-          return;
-        } else {
-          fileReadCompletely = true;
-          if ( !ackEnabled ) {
-            markFileAsDone(reader.getFilePath());
-          }
-        }
-      } catch (IOException e) {
-        LOG.error("I/O Error processing at file location " + 
getFileProgress(reader), e);
-        // don't emit anything .. allow configured spout wait strategy to kick 
in
-        return;
-      } catch (ParseException e) {
-        LOG.error("Parsing error when processing at file location " + 
getFileProgress(reader) +
-                ". Skipping remainder of file.", e);
-        markFileAsBad(reader.getFilePath());
-        // Note: We don't return from this method on ParseException to avoid 
triggering the
-        // spout wait strategy (due to no emits). Instead we go back into the 
loop and
-        // generate a tuple from next file
-      }
-    } // while
-  }
-
-  // will commit progress into lock file if commit threshold is reached
-  private void commitProgress(FileOffset position) {
-    if ( position==null ) {
-      return;
-    }
-    if ( lock!=null && canCommitNow() ) {
-      try {
-        String pos = position.toString();
-        lock.heartbeat(pos);
-        LOG.debug("{} Committed progress. {}", spoutId, pos);
-        acksSinceLastCommit = 0;
-        commitTimeElapsed.set(false);
-        setupCommitElapseTimer();
-      } catch (IOException e) {
-        LOG.error("Unable to commit progress Will retry later. Spout ID = " + 
spoutId, e);
-      }
-    }
-  }
-
-  private void setupCommitElapseTimer() {
-    if ( commitFrequencySec<=0 ) {
-      return;
-    }
-    TimerTask timerTask = new TimerTask() {
-      @Override
-      public void run() {
-        commitTimeElapsed.set(true);
-      }
-    };
-    commitTimer.schedule(timerTask, commitFrequencySec * 1000);
-  }
-
-  private static String getFileProgress(FileReader reader) {
-    return reader.getFilePath() + " " + reader.getFileOffset();
-  }
-
-  private void markFileAsDone(Path filePath) {
-    try {
-      Path newFile = renameCompletedFile(reader.getFilePath());
-      LOG.info("Completed processing {}. Spout Id = {}", newFile, spoutId);
-    } catch (IOException e) {
-      LOG.error("Unable to archive completed file" + filePath + " Spout ID " + 
spoutId, e);
-    }
-    closeReaderAndResetTrackers();
-  }
-
-  private void markFileAsBad(Path file) {
-    String fileName = file.toString();
-    String fileNameMinusSuffix = fileName.substring(0, 
fileName.indexOf(inprogress_suffix));
-    String originalName = new Path(fileNameMinusSuffix).getName();
-    Path  newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
-
-    LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= 
{}", originalName, newFile, tracker.getCommitPosition(), spoutId);
-    try {
-      if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning 
false or throwing exception
-        throw new IOException("Move failed for bad file: " + file); // convert 
false ret value to exception
-      }
-    } catch (IOException e) {
-      LOG.warn("Error moving bad file: " + file + " to destination " + newFile 
+ " SpoutId =" + spoutId, e);
-    }
-    closeReaderAndResetTrackers();
-  }
-
-  private void closeReaderAndResetTrackers() {
-    inflight.clear();
-    tracker.offsets.clear();
-    retryList.clear();
-
-    reader.close();
-    reader = null;
-    releaseLockAndLog(lock, spoutId);
-    lock = null;
-  }
-
-  private static void releaseLockAndLog(FileLock fLock, String spoutId) {
-    try {
-      if ( fLock!=null ) {
-        fLock.release();
-        LOG.debug("Spout {} released FileLock. SpoutId = {}", 
fLock.getLockFile(), spoutId);
-      }
-    } catch (IOException e) {
-      LOG.error("Unable to delete lock file : " +fLock.getLockFile() + " 
SpoutId =" + spoutId, e);
-    }
-  }
-
-  protected void emitData(List<Object> tuple, MessageId id) {
-    LOG.trace("Emitting - {}", id);
-
-    if ( outputStreamName==null )
-      collector.emit( tuple, id );
-    else
-      collector.emit( outputStreamName, tuple, id );
-
-    inflight.put(id, tuple);
-  }
-
-  public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
-    LOG.info("Opening HDFS Spout");
-    this.conf = conf;
-    this.commitTimer = new Timer();
-    this.tracker = new ProgressTracker();
-    this.hdfsConfig = new Configuration();
-
-    this.collector = collector;
-    this.hdfsConfig = new Configuration();
-    this.tupleCounter = 0;
-
-    // Hdfs related settings
-    if ( this.hdfsUri==null && conf.containsKey(Configs.HDFS_URI) ) {
-      this.hdfsUri = conf.get(Configs.HDFS_URI).toString();
-    }
-    if ( this.hdfsUri==null ) {
-      throw new RuntimeException("HDFS Uri not set on spout");
-    }
-
-    try {
-      this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig);
-    } catch (IOException e) {
-      LOG.error("Unable to instantiate file system", e);
-      throw new RuntimeException("Unable to instantiate file system", e);
-    }
-
-
-    if ( conf.containsKey(configKey) ) {
-      Map<String, Object> map = (Map<String, Object>)conf.get(configKey);
-        if ( map != null ) {
-          for(String keyName : map.keySet()){
-            LOG.info("HDFS Config override : {} = {} ", keyName, 
String.valueOf(map.get(keyName)));
-            this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName)));
-          }
-          try {
-            HdfsSecurityUtil.login(conf, hdfsConfig);
-          } catch (IOException e) {
-            LOG.error("HDFS Login failed ", e);
-            throw new RuntimeException(e);
-          }
-        } // if (map != null)
-      }
+    private SpoutOutputCollector collector;
+    HashMap<MessageId, List<Object>> inflight = new HashMap<>();
+    LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = 
new LinkedBlockingQueue<>();
+
+    private Configuration hdfsConfig;
+
+    private Map conf = null;
+    private FileLock lock;
+    private String spoutId = null;
+
+    HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null;
+    private long lastExpiredLockTime = 0;
+
+    private long tupleCounter = 0;
+    private boolean ackEnabled = false;
+    private int acksSinceLastCommit = 0;
+    private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
+    private Timer commitTimer;
+    private boolean fileReadCompletely = true;
+
+    private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for 
hdfs Kerberos configs
 
-    // Reader type config
-    if ( readerType==null && conf.containsKey(Configs.READER_TYPE) ) {
-      readerType = conf.get(Configs.READER_TYPE).toString();
+    public HdfsSpout() {
     }
-    checkValidReader(readerType);
 
-    // -- source dir config
-    if ( sourceDir==null && conf.containsKey(Configs.SOURCE_DIR) ) {
-      sourceDir = conf.get(Configs.SOURCE_DIR).toString();
+    public HdfsSpout setHdfsUri(String hdfsUri) {
+        this.hdfsUri = hdfsUri;
+        return this;
     }
-    if ( sourceDir==null ) {
-      LOG.error(Configs.SOURCE_DIR + " setting is required");
-      throw new RuntimeException(Configs.SOURCE_DIR + " setting is required");
+
+    public HdfsSpout setReaderType(String readerType) {
+        this.readerType = readerType;
+        return this;
+    }
+
+    public HdfsSpout setSourceDir(String sourceDir) {
+        this.sourceDir = sourceDir;
+        return this;
     }
-    this.sourceDirPath = new Path( sourceDir );
 
-    // -- archive dir config
-    if ( archiveDir==null && conf.containsKey(Configs.ARCHIVE_DIR) ) {
-      archiveDir = conf.get(Configs.ARCHIVE_DIR).toString();
+    public HdfsSpout setArchiveDir(String archiveDir) {
+        this.archiveDir = archiveDir;
+        return this;
     }
-    if ( archiveDir==null ) {
-      LOG.error(Configs.ARCHIVE_DIR + " setting is required");
-      throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required");
+
+    public HdfsSpout setBadFilesDir(String badFilesDir) {
+        this.badFilesDir = badFilesDir;
+        return this;
+    }
+
+    public HdfsSpout setLockDir(String lockDir) {
+        this.lockDir = lockDir;
+        return this;
     }
-    this.archiveDirPath = new Path( archiveDir );
-    validateOrMakeDir(hdfs, archiveDirPath, "Archive");
 
-    // -- bad files dir config
-    if ( badFilesDir==null && conf.containsKey(Configs.BAD_DIR) ) {
-      badFilesDir = conf.get(Configs.BAD_DIR).toString();
+    public HdfsSpout setCommitFrequencyCount(int commitFrequencyCount) {
+        this.commitFrequencyCount = commitFrequencyCount;
+        return this;
     }
-    if ( badFilesDir==null ) {
-      LOG.error(Configs.BAD_DIR + " setting is required");
-      throw new RuntimeException(Configs.BAD_DIR + " setting is required");
+
+    public HdfsSpout setCommitFrequencySec(int commitFrequencySec) {
+        this.commitFrequencySec = commitFrequencySec;
+        return this;
+    }
+
+    public HdfsSpout setMaxOutstanding(int maxOutstanding) {
+        this.maxOutstanding = maxOutstanding;
+        return this;
+    }
+
+    public HdfsSpout setLockTimeoutSec(int lockTimeoutSec) {
+        this.lockTimeoutSec = lockTimeoutSec;
+        return this;
     }
-    this.badFilesDirPath = new Path(badFilesDir);
-    validateOrMakeDir(hdfs, badFilesDirPath, "bad files");
 
-    // -- ignore file names config
-    if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) {
-      this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString();
+    public HdfsSpout setClocksInSync(boolean clocksInSync) {
+        this.clocksInSync = clocksInSync;
+        return this;
     }
 
-    // -- lock dir config
-    if ( lockDir==null && conf.containsKey(Configs.LOCK_DIR) ) {
-      lockDir = conf.get(Configs.LOCK_DIR).toString();
+    public HdfsSpout setIgnoreSuffix(String ignoreSuffix) {
+        this.ignoreSuffix = ignoreSuffix;
+        return this;
     }
-    if ( lockDir==null ) {
-      lockDir = getDefaultLockDir(sourceDirPath);
+
+    /**
+     * Output field names. Number of fields depends upon the reader type
+     */
+    public HdfsSpout withOutputFields(String... fields) {
+        outputFields = new Fields(fields);
+        return this;
     }
-    this.lockDirPath = new Path(lockDir);
-    validateOrMakeDir(hdfs,lockDirPath, "locks");
 
+    /**
+     * set key name under which HDFS options are placed. (similar to HDFS 
bolt). default key name is 'hdfs.config'
+     */
+    public HdfsSpout withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
+    }
 
-    // -- lock timeout
-    if ( conf.get(Configs.LOCK_TIMEOUT) !=null ) {
-      this.lockTimeoutSec = 
Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString());
+    /**
+     * Set output stream name
+     */
+    public HdfsSpout withOutputStream(String streamName) {
+        this.outputStreamName = streamName;
+        return this;
     }
 
-    // -- enable/disable ACKing
-    Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
-    if ( ackers!=null ) {
-      int ackerCount = Integer.parseInt(ackers.toString());
-      this.ackEnabled = (ackerCount>0);
-      LOG.debug("ACKer count = {}", ackerCount);
+    public Path getLockDirPath() {
+        return lockDirPath;
     }
-    else { // ackers==null when ackerCount not explicitly set on the topology
-      this.ackEnabled = true;
-      LOG.debug("ACK count not explicitly set on topology.");
+
+    public SpoutOutputCollector getCollector() {
+        return collector;
     }
 
-    LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled");
+    public void nextTuple() {
+        LOG.trace("Next Tuple {}", spoutId);
+        // 1) First re-emit any previously failed tuples (from retryList)
+        if (!retryList.isEmpty()) {
+            LOG.debug("Sending tuple from retry list");
+            HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove();
+            emitData(pair.getValue(), pair.getKey());
+            return;
+        }
 
-    // -- commit frequency - count
-    if ( conf.get(Configs.COMMIT_FREQ_COUNT) != null ) {
-      commitFrequencyCount = 
Integer.parseInt(conf.get(Configs.COMMIT_FREQ_COUNT).toString());
+        if (ackEnabled && tracker.size() >= maxOutstanding) {
+            LOG.warn("Waiting for more ACKs before generating new tuples. "
+                + "Progress tracker size has reached limit {}, SpoutID {}",
+                 maxOutstanding, spoutId);
+            // Don't emit anything .. allow configured spout wait strategy to 
kick in
+            return;
+        }
+
+        // 2) If no failed tuples to be retried, then send tuples from hdfs
+        while (true) {
+            try {
+                // 3) Select a new file if one is not open already
+                if (reader == null) {
+                    reader = pickNextFile();
+                    if (reader == null) {
+                        LOG.debug("Currently no new files to process under : " 
+ sourceDirPath);
+                        return;
+                    } else {
+                        fileReadCompletely = false;
+                    }
+                }
+                if (fileReadCompletely) { // wait for more ACKs before 
proceeding
+                    return;
+                }
+                // 4) Read record from file, emit to collector and record 
progress
+                List<Object> tuple = reader.next();
+                if (tuple != null) {
+                    fileReadCompletely = false;
+                    ++tupleCounter;
+                    MessageId msgId = new MessageId(tupleCounter, 
reader.getFilePath(), reader.getFileOffset());
+                    emitData(tuple, msgId);
+
+                    if (!ackEnabled) {
+                        ++acksSinceLastCommit; // assume message is 
immediately ACKed in non-ack mode
+                        commitProgress(reader.getFileOffset());
+                    } else {
+                        commitProgress(tracker.getCommitPosition());
+                    }
+                    return;
+                } else {
+                    fileReadCompletely = true;
+                    if (!ackEnabled) {
+                        markFileAsDone(reader.getFilePath());
+                    }
+                }
+            } catch (IOException e) {
+                LOG.error("I/O Error processing at file location " + 
getFileProgress(reader), e);
+                // don't emit anything .. allow configured spout wait strategy 
to kick in
+                return;
+            } catch (ParseException e) {
+                LOG.error("Parsing error when processing at file location " + 
getFileProgress(reader)
+                    + ". Skipping remainder of file.", e);
+                markFileAsBad(reader.getFilePath());
+                // Note: We don't return from this method on ParseException to 
avoid triggering the
+                // spout wait strategy (due to no emits). Instead we go back 
into the loop and
+                // generate a tuple from next file
+            }
+        } // while
+    }
+
+    // will commit progress into lock file if commit threshold is reached
+    private void commitProgress(FileOffset position) {
+        if (position == null) {
+            return;
+        }
+        if (lock != null && canCommitNow()) {
+            try {
+                String pos = position.toString();
+                lock.heartbeat(pos);
+                LOG.debug("{} Committed progress. {}", spoutId, pos);
+                acksSinceLastCommit = 0;
+                commitTimeElapsed.set(false);
+                setupCommitElapseTimer();
+            } catch (IOException e) {
+                LOG.error("Unable to commit progress Will retry later. Spout 
ID = " + spoutId, e);
+            }
+        }
     }
 
-    // -- commit frequency - seconds
-    if ( conf.get(Configs.COMMIT_FREQ_SEC) != null ) {
-      commitFrequencySec = 
Integer.parseInt(conf.get(Configs.COMMIT_FREQ_SEC).toString());
-      if ( commitFrequencySec<=0 ) {
-        throw new RuntimeException(Configs.COMMIT_FREQ_SEC + " setting must be 
greater than 0");
-      }
-    }
-
-    // -- max outstanding tuples
-    if ( conf.get(Configs.MAX_OUTSTANDING) !=null ) {
-      maxOutstanding = 
Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString());
-    }
+    private void setupCommitElapseTimer() {
+        if (commitFrequencySec <= 0) {
+            return;
+        }
+        TimerTask timerTask = new TimerTask() {
+            @Override
+            public void run() {
+                commitTimeElapsed.set(true);
+            }
+        };
+        commitTimer.schedule(timerTask, commitFrequencySec * 1000);
+    }
 
-    // -- clocks in sync
-    if ( conf.get(Configs.CLOCKS_INSYNC) !=null ) {
-      clocksInSync = 
Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
+    private static String getFileProgress(FileReader reader) {
+        return reader.getFilePath() + " " + reader.getFileOffset();
     }
 
-    // -- spout id
-    spoutId = context.getThisComponentId();
+    private void markFileAsDone(Path filePath) {
+        try {
+            Path newFile = renameCompletedFile(reader.getFilePath());
+            LOG.info("Completed processing {}. Spout Id = {}", newFile, 
spoutId);
+        } catch (IOException e) {
+            LOG.error("Unable to archive completed file" + filePath + " Spout 
ID " + spoutId, e);
+        }
+        closeReaderAndResetTrackers();
+    }
 
-    // setup timer for commit elapse time tracking
-    setupCommitElapseTimer();
-  }
+    private void markFileAsBad(Path file) {
+        String fileName = file.toString();
+        String fileNameMinusSuffix = fileName.substring(0, 
fileName.indexOf(inprogress_suffix));
+        String originalName = new Path(fileNameMinusSuffix).getName();
+        Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + 
originalName);
 
-  private static void validateOrMakeDir(FileSystem fs, Path dir, String 
dirDescription) {
-    try {
-      if ( fs.exists(dir) ) {
-        if ( !fs.isDirectory(dir) ) {
-          LOG.error(dirDescription + " directory is a file, not a dir. " + 
dir);
-          throw new RuntimeException(dirDescription + " directory is a file, 
not a dir. " + dir);
+        LOG.info("Moving bad file {} to {}. Processed it till offset {}. 
SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId);
+        try {
+            if (!hdfs.rename(file, newFile)) { // seems this can fail by 
returning false or throwing exception
+                throw new IOException("Move failed for bad file: " + file); // 
convert false ret value to exception
+            }
+        } catch (IOException e) {
+            LOG.warn("Error moving bad file: " + file + " to destination " + 
newFile + " SpoutId =" + spoutId, e);
         }
-      } else if ( ! fs.mkdirs(dir) ) {
-        LOG.error("Unable to create " + dirDescription + " directory " + dir);
-        throw new RuntimeException("Unable to create " + dirDescription + " 
directory " + dir);
-      }
-    } catch (IOException e) {
-      LOG.error("Unable to create " + dirDescription + " directory " + dir, e);
-      throw new RuntimeException("Unable to create " + dirDescription + " 
directory " + dir, e);
+        closeReaderAndResetTrackers();
     }
-  }
 
-  private String getDefaultLockDir(Path sourceDirPath) {
-    return sourceDirPath.toString() + Path.SEPARATOR + 
Configs.DEFAULT_LOCK_DIR;
-  }
+    private void closeReaderAndResetTrackers() {
+        inflight.clear();
+        tracker.offsets.clear();
+        retryList.clear();
+
+        reader.close();
+        reader = null;
+        releaseLockAndLog(lock, spoutId);
+        lock = null;
+    }
 
-  private static void checkValidReader(String readerType) {
-    if ( readerType.equalsIgnoreCase(Configs.TEXT)  || 
readerType.equalsIgnoreCase(Configs.SEQ) )
-      return;
-    try {
-      Class<?> classType = Class.forName(readerType);
-      classType.getConstructor(FileSystem.class, Path.class, Map.class);
-      return;
-    } catch (ClassNotFoundException e) {
-      LOG.error(readerType + " not found in classpath.", e);
-      throw new IllegalArgumentException(readerType + " not found in 
classpath.", e);
-    } catch (NoSuchMethodException e) {
-      LOG.error(readerType + " is missing the expected constructor for 
Readers.", e);
-      throw new IllegalArgumentException(readerType + " is missing the 
expected constuctor for Readers.");
+    private static void releaseLockAndLog(FileLock fLock, String spoutId) {
+        try {
+            if (fLock != null) {
+                fLock.release();
+                LOG.debug("Spout {} released FileLock. SpoutId = {}", 
fLock.getLockFile(), spoutId);
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to delete lock file : " + fLock.getLockFile() + 
" SpoutId =" + spoutId, e);
+        }
     }
-  }
 
-  @Override
-  public void ack(Object msgId) {
-    LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId);
-    if ( !ackEnabled ) {
-      return;
+    protected void emitData(List<Object> tuple, MessageId id) {
+        LOG.trace("Emitting - {}", id);
+
+        if (outputStreamName == null) {
+            collector.emit(tuple, id);
+        } else {
+            collector.emit(outputStreamName, tuple, id);
+        }
+
+        inflight.put(id, tuple);
     }
-    MessageId id = (MessageId) msgId;
-    inflight.remove(id);
-    ++acksSinceLastCommit;
-    tracker.recordAckedOffset(id.offset);
-    commitProgress(tracker.getCommitPosition());
-    if ( fileReadCompletely && inflight.isEmpty() ) {
-      markFileAsDone(reader.getFilePath());
-      reader = null;
-    }
-    super.ack(msgId);
-  }
-
-  private boolean canCommitNow() {
-
-    if ( commitFrequencyCount>0 &&  acksSinceLastCommit >= 
commitFrequencyCount ) {
-      return true;
-    }
-    return commitTimeElapsed.get();
-  }
-
-  @Override
-  public void fail(Object msgId) {
-    LOG.trace("Fail received for msg id {} on spout {}", msgId, spoutId);
-    super.fail(msgId);
-    if ( ackEnabled ) {
-      HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, 
inflight.remove(msgId));
-      retryList.add(item);
-    }
-  }
-
-  private FileReader pickNextFile() {
-    try {
-      // 1) If there are any abandoned files, pick oldest one
-      lock = getOldestExpiredLock();
-      if ( lock!=null ) {
-        LOG.debug("Spout {} now took over ownership of abandoned FileLock {}", 
spoutId, lock.getLockFile());
-        Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
-        String resumeFromOffset = lock.getLastLogEntry().fileOffset;
-        LOG.info("Resuming processing of abandoned file : {}", file);
-        return createFileReader(file, resumeFromOffset);
-      }
-
-      // 2) If no abandoned files, then pick oldest file in sourceDirPath, 
lock it and rename it
-      Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, 
sourceDirPath, 0);
-
-      for (Path file : listing) {
-        if (file.getName().endsWith(inprogress_suffix)) {
-          continue;
-        }
-        if (file.getName().endsWith(ignoreSuffix)) {
-          continue;
-        }
-        lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
-        if (lock == null) {
-          LOG.debug("Unable to get FileLock for {}, so skipping it.", file);
-          continue; // could not lock, so try another file.
+
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
+        LOG.info("Opening HDFS Spout");
+        this.conf = conf;
+        this.commitTimer = new Timer();
+        this.tracker = new ProgressTracker();
+        this.hdfsConfig = new Configuration();
+
+        this.collector = collector;
+        this.hdfsConfig = new Configuration();
+        this.tupleCounter = 0;
+
+        // Hdfs related settings
+        if (this.hdfsUri == null && conf.containsKey(Configs.HDFS_URI)) {
+            this.hdfsUri = conf.get(Configs.HDFS_URI).toString();
+        }
+        if (this.hdfsUri == null) {
+            throw new RuntimeException("HDFS Uri not set on spout");
         }
+
         try {
-          Path newFile = renameToInProgressFile(file);
-          FileReader result = createFileReader(newFile);
-          LOG.info("Processing : {} ", file);
-          return result;
+            this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig);
+        } catch (IOException e) {
+            LOG.error("Unable to instantiate file system", e);
+            throw new RuntimeException("Unable to instantiate file system", e);
+        }
+
+        if (conf.containsKey(configKey)) {
+            Map<String, Object> map = (Map<String, Object>) 
conf.get(configKey);
+            if (map != null) {
+                for (String keyName : map.keySet()) {
+                    LOG.info("HDFS Config override : {} = {} ", keyName, 
String.valueOf(map.get(keyName)));
+                    this.hdfsConfig.set(keyName, 
String.valueOf(map.get(keyName)));
+                }
+                try {
+                    HdfsSecurityUtil.login(conf, hdfsConfig);
+                } catch (IOException e) {
+                    LOG.error("HDFS Login failed ", e);
+                    throw new RuntimeException(e);
+                }
+            } // if (map != null)
+        }
+
+        // Reader type config
+        if (readerType == null && conf.containsKey(Configs.READER_TYPE)) {
+            readerType = conf.get(Configs.READER_TYPE).toString();
+        }
+        checkValidReader(readerType);
+
+        // -- source dir config
+        if (sourceDir == null && conf.containsKey(Configs.SOURCE_DIR)) {
+            sourceDir = conf.get(Configs.SOURCE_DIR).toString();
+        }
+        if (sourceDir == null) {
+            LOG.error(Configs.SOURCE_DIR + " setting is required");
+            throw new RuntimeException(Configs.SOURCE_DIR + " setting is 
required");
+        }
+        this.sourceDirPath = new Path(sourceDir);
+
+        // -- archive dir config
+        if (archiveDir == null && conf.containsKey(Configs.ARCHIVE_DIR)) {
+            archiveDir = conf.get(Configs.ARCHIVE_DIR).toString();
+        }
+        if (archiveDir == null) {
+            LOG.error(Configs.ARCHIVE_DIR + " setting is required");
+            throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is 
required");
+        }
+        this.archiveDirPath = new Path(archiveDir);
+        validateOrMakeDir(hdfs, archiveDirPath, "Archive");
+
+        // -- bad files dir config
+        if (badFilesDir == null && conf.containsKey(Configs.BAD_DIR)) {
+            badFilesDir = conf.get(Configs.BAD_DIR).toString();
+        }
+        if (badFilesDir == null) {
+            LOG.error(Configs.BAD_DIR + " setting is required");
+            throw new RuntimeException(Configs.BAD_DIR + " setting is 
required");
+        }
+        this.badFilesDirPath = new Path(badFilesDir);
+        validateOrMakeDir(hdfs, badFilesDirPath, "bad files");
+
+        // -- ignore file names config
+        if (conf.containsKey(Configs.IGNORE_SUFFIX)) {
+            this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString();
+        }
+
+        // -- lock dir config
+        if (lockDir == null && conf.containsKey(Configs.LOCK_DIR)) {
+            lockDir = conf.get(Configs.LOCK_DIR).toString();
+        }
+        if (lockDir == null) {
+            lockDir = getDefaultLockDir(sourceDirPath);
+        }
+        this.lockDirPath = new Path(lockDir);
+        validateOrMakeDir(hdfs, lockDirPath, "locks");
+
+        // -- lock timeout
+        if (conf.get(Configs.LOCK_TIMEOUT) != null) {
+            this.lockTimeoutSec = 
Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString());
+        }
+
+        // -- enable/disable ACKing
+        Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+        if (ackers != null) {
+            int ackerCount = Integer.parseInt(ackers.toString());
+            this.ackEnabled = (ackerCount > 0);
+            LOG.debug("ACKer count = {}", ackerCount);
+        } else { // ackers==null when ackerCount not explicitly set on the 
topology
+            this.ackEnabled = true;
+            LOG.debug("ACK count not explicitly set on topology.");
+        }
+
+        LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled");
+
+        // -- commit frequency - count
+        if (conf.get(Configs.COMMIT_FREQ_COUNT) != null) {
+            commitFrequencyCount = 
Integer.parseInt(conf.get(Configs.COMMIT_FREQ_COUNT).toString());
+        }
+
+        // -- commit frequency - seconds
+        if (conf.get(Configs.COMMIT_FREQ_SEC) != null) {
+            commitFrequencySec = 
Integer.parseInt(conf.get(Configs.COMMIT_FREQ_SEC).toString());
+            if (commitFrequencySec <= 0) {
+                throw new RuntimeException(Configs.COMMIT_FREQ_SEC + " setting 
must be greater than 0");
+            }
+        }
+
+        // -- max outstanding tuples
+        if (conf.get(Configs.MAX_OUTSTANDING) != null) {
+            maxOutstanding = 
Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString());
+        }
+
+        // -- clocks in sync
+        if (conf.get(Configs.CLOCKS_INSYNC) != null) {
+            clocksInSync = 
Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
+        }
+
+        // -- spout id
+        spoutId = context.getThisComponentId();
+
+        // setup timer for commit elapse time tracking
+        setupCommitElapseTimer();
+    }
+
+    @Override
+    public void close() {
+        this.commitTimer.cancel();
+    }
+
+    private static void validateOrMakeDir(FileSystem fs, Path dir, String 
dirDescription) {
+        try {
+            if (fs.exists(dir)) {
+                if (!fs.isDirectory(dir)) {
+                    LOG.error(dirDescription + " directory is a file, not a 
dir. " + dir);
+                    throw new RuntimeException(dirDescription + " directory is 
a file, not a dir. " + dir);
+                }
+            } else if (!fs.mkdirs(dir)) {
+                LOG.error("Unable to create " + dirDescription + " directory " 
+ dir);
+                throw new RuntimeException("Unable to create " + 
dirDescription + " directory " + dir);
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to create " + dirDescription + " directory " + 
dir, e);
+            throw new RuntimeException("Unable to create " + dirDescription + 
" directory " + dir, e);
+        }
+    }
+
+    private String getDefaultLockDir(Path sourceDirPath) {
+        return sourceDirPath.toString() + Path.SEPARATOR + 
Configs.DEFAULT_LOCK_DIR;
+    }
+
+    private static void checkValidReader(String readerType) {
+        if (readerType.equalsIgnoreCase(Configs.TEXT) || 
readerType.equalsIgnoreCase(Configs.SEQ)) {
+            return;
+        }
+        try {
+            Class<?> classType = Class.forName(readerType);
+            classType.getConstructor(FileSystem.class, Path.class, Map.class);
+            return;
+        } catch (ClassNotFoundException e) {
+            LOG.error(readerType + " not found in classpath.", e);
+            throw new IllegalArgumentException(readerType + " not found in 
classpath.", e);
+        } catch (NoSuchMethodException e) {
+            LOG.error(readerType + " is missing the expected constructor for 
Readers.", e);
+            throw new IllegalArgumentException(readerType + " is missing the 
expected constuctor for Readers.");
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId);
+        if (!ackEnabled) {
+            return;
+        }
+        MessageId id = (MessageId) msgId;
+        inflight.remove(id);
+        ++acksSinceLastCommit;
+        tracker.recordAckedOffset(id.offset);
+        commitProgress(tracker.getCommitPosition());
+        if (fileReadCompletely && inflight.isEmpty()) {
+            markFileAsDone(reader.getFilePath());
+            reader = null;
+        }
+        super.ack(msgId);
+    }
+
+    private boolean canCommitNow() {
+
+        if (commitFrequencyCount > 0 && acksSinceLastCommit >= 
commitFrequencyCount) {
+            return true;
+        }
+        return commitTimeElapsed.get();
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        LOG.trace("Fail received for msg id {} on spout {}", msgId, spoutId);
+        super.fail(msgId);
+        if (ackEnabled) {
+            HdfsUtils.Pair<MessageId, List<Object>> item = 
HdfsUtils.Pair.of(msgId, inflight.remove(msgId));
+            retryList.add(item);
+        }
+    }
+
+    private FileReader pickNextFile() {
+        try {
+            // 1) If there are any abandoned files, pick oldest one
+            lock = getOldestExpiredLock();
+            if (lock != null) {
+                LOG.debug("Spout {} now took over ownership of abandoned 
FileLock {}", spoutId, lock.getLockFile());
+                Path file = getFileForLockFile(lock.getLockFile(), 
sourceDirPath);
+                String resumeFromOffset = lock.getLastLogEntry().fileOffset;
+                LOG.info("Resuming processing of abandoned file : {}", file);
+                return createFileReader(file, resumeFromOffset);
+            }
+
+            // 2) If no abandoned files, then pick oldest file in 
sourceDirPath, lock it and rename it
+            Collection<Path> listing = 
HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
+
+            for (Path file : listing) {
+                if (file.getName().endsWith(inprogress_suffix)) {
+                    continue;
+                }
+                if (file.getName().endsWith(ignoreSuffix)) {
+                    continue;
+                }
+                lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
+                if (lock == null) {
+                    LOG.debug("Unable to get FileLock for {}, so skipping 
it.", file);
+                    continue; // could not lock, so try another file.
+                }
+                try {
+                    Path newFile = renameToInProgressFile(file);
+                    FileReader result = createFileReader(newFile);
+                    LOG.info("Processing : {} ", file);
+                    return result;
+                } catch (Exception e) {
+                    LOG.error("Skipping file " + file, e);
+                    releaseLockAndLog(lock, spoutId);
+                    continue;
+                }
+            }
+
+            return null;
+        } catch (IOException e) {
+            LOG.error("Unable to select next file for consumption " + 
sourceDirPath, e);
+            return null;
+        }
+    }
+
+    /**
+     * If clocks in sync, then acquires the oldest expired lock Else, on first 
call, just remembers the oldest expired lock, on next call
+     * check if the lock is updated. if not updated then acquires the lock
+     *
+     * @return a lock object
+     * @throws IOException
+     */
+    private FileLock getOldestExpiredLock() throws IOException {
+        // 1 - acquire lock on dir
+        DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
+        if (dirlock == null) {
+            dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, 
lockTimeoutSec);
+            if (dirlock == null) {
+                LOG.debug("Spout {} could not take over ownership of DirLock 
for {}", spoutId, lockDirPath);
+                return null;
+            }
+            LOG.debug("Spout {} now took over ownership of abandoned DirLock 
for {}", spoutId, lockDirPath);
+        } else {
+            LOG.debug("Spout {} now owns DirLock for {}", spoutId, 
lockDirPath);
+        }
+
+        try {
+            // 2 - if clocks are in sync then simply take ownership of the 
oldest expired lock
+            if (clocksInSync) {
+                return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, 
lockTimeoutSec, spoutId);
+            }
+
+            // 3 - if clocks are not in sync ..
+            if (lastExpiredLock == null) {
+                // just make a note of the oldest expired lock now and check 
if its still unmodified after lockTimeoutSec
+                lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, 
lockDirPath, lockTimeoutSec);
+                lastExpiredLockTime = System.currentTimeMillis();
+                return null;
+            }
+            // see if lockTimeoutSec time has elapsed since we last selected 
the lock file
+            if (hasExpired(lastExpiredLockTime)) {
+                return null;
+            }
+
+            // If lock file has expired, then own it
+            FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, 
lastExpiredLock.getKey());
+            if (lastEntry.equals(lastExpiredLock.getValue())) {
+                FileLock result = FileLock.takeOwnership(hdfs, 
lastExpiredLock.getKey(), lastEntry, spoutId);
+                lastExpiredLock = null;
+                return result;
+            } else {
+                // if lock file has been updated since last time, then leave 
this lock file alone
+                lastExpiredLock = null;
+                return null;
+            }
+        } finally {
+            dirlock.release();
+            LOG.debug("Released DirLock {}, SpoutID {} ", 
dirlock.getLockFile(), spoutId);
+        }
+    }
+
+    private boolean hasExpired(long lastModifyTime) {
+        return (System.currentTimeMillis() - lastModifyTime) < lockTimeoutSec 
* 1000;
+    }
+
+    /**
+     * Creates a reader that reads from beginning of file
+     *
+     * @param file file to read
+     * @return
+     * @throws IOException
+     */
+    private FileReader createFileReader(Path file)
+        throws IOException {
+        if (readerType.equalsIgnoreCase(Configs.SEQ)) {
+            return new SequenceFileReader(this.hdfs, file, conf);
+        }
+        if (readerType.equalsIgnoreCase(Configs.TEXT)) {
+            return new TextFileReader(this.hdfs, file, conf);
+        }
+        try {
+            Class<?> clsType = Class.forName(readerType);
+            Constructor<?> constructor = 
clsType.getConstructor(FileSystem.class, Path.class, Map.class);
+            return (FileReader) constructor.newInstance(this.hdfs, file, conf);
         } catch (Exception e) {
-          LOG.error("Skipping file " + file, e);
-          releaseLockAndLog(lock, spoutId);
-          continue;
-        }
-      }
-
-      return null;
-    } catch (IOException e) {
-      LOG.error("Unable to select next file for consumption " + sourceDirPath, 
e);
-      return null;
-    }
-  }
-
-  /**
-   * If clocks in sync, then acquires the oldest expired lock
-   * Else, on first call, just remembers the oldest expired lock, on next call 
check if the lock is updated. if not updated then acquires the lock
-   * @return a lock object
-   * @throws IOException
-   */
-  private FileLock getOldestExpiredLock() throws IOException {
-    // 1 - acquire lock on dir
-    DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
-    if (dirlock == null) {
-      dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, 
lockTimeoutSec);
-      if (dirlock == null) {
-        LOG.debug("Spout {} could not take over ownership of DirLock for {}", 
spoutId, lockDirPath);
-        return null;
-      }
-      LOG.debug("Spout {} now took over ownership of abandoned DirLock for 
{}", spoutId, lockDirPath);
-    } else {
-      LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath);
-    }
-
-    try {
-      // 2 - if clocks are in sync then simply take ownership of the oldest 
expired lock
-      if (clocksInSync) {
-        return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, 
lockTimeoutSec, spoutId);
-      }
-
-      // 3 - if clocks are not in sync ..
-      if ( lastExpiredLock == null ) {
-        // just make a note of the oldest expired lock now and check if its 
still unmodified after lockTimeoutSec
-        lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, 
lockTimeoutSec);
-        lastExpiredLockTime = System.currentTimeMillis();
-        return null;
-      }
-      // see if lockTimeoutSec time has elapsed since we last selected the 
lock file
-      if ( hasExpired(lastExpiredLockTime) ) {
-        return null;
-      }
-
-      // If lock file has expired, then own it
-      FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, 
lastExpiredLock.getKey());
-      if ( lastEntry.equals(lastExpiredLock.getValue()) ) {
-        FileLock result = FileLock.takeOwnership(hdfs, 
lastExpiredLock.getKey(), lastEntry, spoutId);
-        lastExpiredLock = null;
-        return  result;
-      } else {
-        // if lock file has been updated since last time, then leave this lock 
file alone
-        lastExpiredLock = null;
+            LOG.error(e.getMessage(), e);
+            throw new RuntimeException("Unable to instantiate " + readerType + 
" reader", e);
+        }
+    }
+
+    /**
+     * Creates a reader that starts reading from 'offset'
+     *
+     * @param file the file to read
+     * @param offset the offset string should be understandable by the reader 
type being used
+     * @return
+     * @throws IOException
+     */
+    private FileReader createFileReader(Path file, String offset)
+        throws IOException {
+        if (readerType.equalsIgnoreCase(Configs.SEQ)) {
+            return new SequenceFileReader(this.hdfs, file, conf, offset);
+        }
+        if (readerType.equalsIgnoreCase(Configs.TEXT)) {
+            return new TextFileReader(this.hdfs, file, conf, offset);
+        }
+
+        try {
+            Class<?> clsType = Class.forName(readerType);
+            Constructor<?> constructor = 
clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class);
+            return (FileReader) constructor.newInstance(this.hdfs, file, conf, 
offset);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            throw new RuntimeException("Unable to instantiate " + readerType, 
e);
+        }
+    }
+
+    /**
+     * Renames files with .inprogress suffix
+     *
+     * @return path of renamed file
+     * @throws if operation fails
+     */
+    private Path renameToInProgressFile(Path file)
+        throws IOException {
+        Path newFile = new Path(file.toString() + inprogress_suffix);
+        try {
+            if (hdfs.rename(file, newFile)) {
+                return newFile;
+            }
+            throw new RenameException(file, newFile);
+        } catch (IOException e) {
+            throw new RenameException(file, newFile, e);
+        }
+    }
+
+    /**
+     * Returns the corresponding input file in the 'sourceDirPath' for the 
specified lock file. If no such file is found then returns null
+     */
+    private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
+        throws IOException {
+        String lockFileName = lockFile.getName();
+        Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName 
+ inprogress_suffix);
+        if (hdfs.exists(dataFile)) {
+            return dataFile;
+        }
+        dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName);
+        if (hdfs.exists(dataFile)) {
+            return dataFile;
+        }
         return null;
-      }
-    } finally {
-      dirlock.release();
-      LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), 
spoutId);
-    }
-  }
-
-  private boolean hasExpired(long lastModifyTime) {
-    return (System.currentTimeMillis() - lastModifyTime ) < 
lockTimeoutSec*1000;
-  }
-
-  /**
-   * Creates a reader that reads from beginning of file
-   * @param file file to read
-   * @return
-   * @throws IOException
-   */
-  private FileReader createFileReader(Path file)
-          throws IOException {
-    if ( readerType.equalsIgnoreCase(Configs.SEQ) ) {
-      return new SequenceFileReader(this.hdfs, file, conf);
-    }
-    if ( readerType.equalsIgnoreCase(Configs.TEXT) ) {
-      return new TextFileReader(this.hdfs, file, conf);
-    }
-    try {
-      Class<?> clsType = Class.forName(readerType);
-      Constructor<?> constructor = clsType.getConstructor(FileSystem.class, 
Path.class, Map.class);
-      return (FileReader) constructor.newInstance(this.hdfs, file, conf);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw new RuntimeException("Unable to instantiate " + readerType + " 
reader", e);
-    }
-  }
-
-
-  /**
-   * Creates a reader that starts reading from 'offset'
-   * @param file the file to read
-   * @param offset the offset string should be understandable by the reader 
type being used
-   * @return
-   * @throws IOException
-   */
-  private FileReader createFileReader(Path file, String offset)
-          throws IOException {
-    if ( readerType.equalsIgnoreCase(Configs.SEQ) ) {
-      return new SequenceFileReader(this.hdfs, file, conf, offset);
-    }
-    if ( readerType.equalsIgnoreCase(Configs.TEXT) ) {
-      return new TextFileReader(this.hdfs, file, conf, offset);
-    }
-
-    try {
-      Class<?> clsType = Class.forName(readerType);
-      Constructor<?> constructor = clsType.getConstructor(FileSystem.class, 
Path.class, Map.class, String.class);
-      return (FileReader) constructor.newInstance(this.hdfs, file, conf, 
offset);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw new RuntimeException("Unable to instantiate " + readerType, e);
-    }
-  }
-
-  /**
-   * Renames files with .inprogress suffix
-   * @return path of renamed file
-   * @throws if operation fails
-   */
-  private Path renameToInProgressFile(Path file)
-          throws IOException {
-    Path newFile =  new Path( file.toString() + inprogress_suffix );
-    try {
-      if (hdfs.rename(file, newFile)) {
-        return newFile;
-      }
-      throw new RenameException(file, newFile);
-    } catch (IOException e){
-      throw new RenameException(file, newFile, e);
-    }
-  }
-
-  /** Returns the corresponding input file in the 'sourceDirPath' for the 
specified lock file.
-   *  If no such file is found then returns null
-   */
-  private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
-          throws IOException {
-    String lockFileName = lockFile.getName();
-    Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + 
inprogress_suffix);
-    if ( hdfs.exists(dataFile) ) {
-      return dataFile;
-    }
-    dataFile = new Path(sourceDirPath + Path.SEPARATOR +  lockFileName);
-    if ( hdfs.exists(dataFile) ) {
-      return dataFile;
-    }
-    return null;
-  }
-
-
-  // renames files and returns the new file path
-  private Path renameCompletedFile(Path file) throws IOException {
-    String fileName = file.toString();
-    String fileNameMinusSuffix = fileName.substring(0, 
fileName.indexOf(inprogress_suffix));
-    String newName = new Path(fileNameMinusSuffix).getName();
-
-    Path  newFile = new Path( archiveDirPath + Path.SEPARATOR + newName );
-    LOG.info("Completed consuming file {}", fileNameMinusSuffix);
-    if ( !hdfs.rename(file, newFile) ) {
-      throw new IOException("Rename failed for file: " + file);
-    }
-    LOG.debug("Renamed file {} to {} ", file, newFile);
-    return newFile;
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    if (outputStreamName!=null) {
-      declarer.declareStream(outputStreamName, outputFields);
-    } else {
-      declarer.declare(outputFields);
-    }
-  }
-
-  static class MessageId implements  Comparable<MessageId> {
-    public long msgNumber; // tracks order in which msg came in
-    public String fullPath;
-    public FileOffset offset;
-
-    public MessageId(long msgNumber, Path fullPath, FileOffset offset) {
-      this.msgNumber = msgNumber;
-      this.fullPath = fullPath.toString();
-      this.offset = offset;
     }
 
-    @Override
-    public String toString() {
-      return "{'" +  fullPath + "':" + offset + "}";
+    // renames files and returns the new file path
+    private Path renameCompletedFile(Path file) throws IOException {
+        String fileName = file.toString();
+        String fileNameMinusSuffix = fileName.substring(0, 
fileName.indexOf(inprogress_suffix));
+        String newName = new Path(fileNameMinusSuffix).getName();
+
+        Path newFile = new Path(archiveDirPath + Path.SEPARATOR + newName);
+        LOG.info("Completed consuming file {}", fileNameMinusSuffix);
+        if (!hdfs.rename(file, newFile)) {
+            throw new IOException("Rename failed for file: " + file);
+        }
+        LOG.debug("Renamed file {} to {} ", file, newFile);
+        return newFile;
     }
 
     @Override
-    public int compareTo(MessageId rhs) {
-      if ( msgNumber<rhs.msgNumber ) {
-        return -1;
-      }
-      if ( msgNumber>rhs.msgNumber ) {
-        return 1;
-      }
-      return 0;
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        if (outputStreamName != null) {
+            declarer.declareStream(outputStreamName, outputFields);
+        } else {
+            declarer.declare(outputFields);
+        }
     }
-  }
 
-  private static class RenameException extends IOException {
-    public final Path oldFile;
-    public final Path newFile;
+    static class MessageId implements Comparable<MessageId> {
+
+        public long msgNumber; // tracks order in which msg came in
+        public String fullPath;
+        public FileOffset offset;
 
-    public RenameException(Path oldFile, Path newFile) {
-      super("Rename of " + oldFile + " to " + newFile + " failed");
-      this.oldFile = oldFile;
-      this.newFile = newFile;
+        public MessageId(long msgNumber, Path fullPath, FileOffset offset) {
+            this.msgNumber = msgNumber;
+            this.fullPath = fullPath.toString();
+            this.offset = offset;
+        }
+
+        @Override
+        public String toString() {
+            return "{'" + fullPath + "':" + offset + "}";
+        }
+
+        @Override
+        public int compareTo(MessageId rhs) {
+            if (msgNumber < rhs.msgNumber) {
+                return -1;
+            }
+            if (msgNumber > rhs.msgNumber) {
+                return 1;
+            }
+            return 0;
+        }
     }
 
-    public RenameException(Path oldFile, Path newFile, IOException cause) {
-      super("Rename of " + oldFile + " to " + newFile + " failed", cause);
-      this.oldFile = oldFile;
-      this.newFile = newFile;
+    private static class RenameException extends IOException {
+
+        public final Path oldFile;
+        public final Path newFile;
+
+        public RenameException(Path oldFile, Path newFile) {
+            super("Rename of " + oldFile + " to " + newFile + " failed");
+            this.oldFile = oldFile;
+            this.newFile = newFile;
+        }
+
+        public RenameException(Path oldFile, Path newFile, IOException cause) {
+            super("Rename of " + oldFile + " to " + newFile + " failed", 
cause);
+            this.oldFile = oldFile;
+            this.newFile = newFile;
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
index a584f91..b588f5b 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
@@ -21,8 +21,7 @@ import org.apache.avro.Schema;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.junit.BeforeClass;
 
 public class TestFixedAvroSerializer {
     //These should match FixedAvroSerializer.config in the test resources
@@ -34,12 +33,13 @@ public class TestFixedAvroSerializer {
             "\"name\":\"stormtest2\"," +
             "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," +
             "{ \"name\":\"intint1\", \"type\":\"int\" }]}";
-    private static final Schema schema1;
-    private static final Schema schema2;
+    private static Schema schema1;
+    private static Schema schema2;
 
     final AvroSchemaRegistry reg;
 
-    static {
+    @BeforeClass
+    public static void setupClass() {
 
         Schema.Parser parser = new Schema.Parser();
         schema1 = parser.parse(schemaString1);
@@ -58,7 +58,8 @@ public class TestFixedAvroSerializer {
         testTheSchema(schema2);
     }
 
-    @Test public void testDifferentFPs() {
+    @Test 
+    public void testDifferentFPs() {
         String fp1 = reg.getFingerprint(schema1);
         String fp2 = reg.getFingerprint(schema2);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
index ddfdcf5..fb97782 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
@@ -19,6 +19,7 @@ package org.apache.storm.hdfs.avro;
 
 import org.apache.avro.Schema;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestGenericAvroSerializer {
@@ -30,12 +31,13 @@ public class TestGenericAvroSerializer {
             "\"name\":\"stormtest2\"," +
             "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," +
             "{ \"name\":\"intint1\", \"type\":\"int\" }]}";
-    private static final Schema schema1;
-    private static final Schema schema2;
+    private static Schema schema1;
+    private static Schema schema2;
 
     AvroSchemaRegistry reg = new GenericAvroSerializer();
 
-    static {
+    @BeforeClass
+    public static void setupClass() {
 
         Schema.Parser parser = new Schema.Parser();
         schema1 = parser.parse(schemaString1);
@@ -50,7 +52,8 @@ public class TestGenericAvroSerializer {
         testTheSchema(schema2);
     }
 
-    @Test public void testDifferentFPs() {
+    @Test 
+    public void testDifferentFPs() {
         String fp1 = reg.getFingerprint(schema1);
         String fp2 = reg.getFingerprint(schema2);
 

Reply via email to