http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
index 53bcc4c..d4a95ee 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.common;
@@ -25,11 +19,11 @@ import 
org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.tuple.Tuple;
 
 abstract public class AbstractHDFSWriter implements Writer {
+    final protected Path filePath;
+    final protected FileRotationPolicy rotationPolicy;
     protected long lastUsedTime;
     protected long offset;
     protected boolean needsRotation;
-    final protected Path filePath;
-    final protected FileRotationPolicy rotationPolicy;
 
     public AbstractHDFSWriter(FileRotationPolicy policy, Path path) {
         //This must be defensively copied, because a bolt probably has only 
one rotation policy object

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
index 6e957c2..d77423c 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
@@ -1,23 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.common;
 
 
+import java.io.IOException;
+import java.util.EnumSet;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -31,9 +28,6 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.EnumSet;
-
 public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
index d69d770..578bc06 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
@@ -1,22 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.common;
 
+import java.io.IOException;
+import java.util.EnumSet;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -26,10 +23,7 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.EnumSet;
-
-public class HDFSWriter extends AbstractHDFSWriter{
+public class HDFSWriter extends AbstractHDFSWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HDFSWriter.class);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index 5ec5333..462087e 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -1,100 +1,93 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.common;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-
 public class HdfsUtils {
-  /** list files sorted by modification time that have not been modified since 
'olderThan'. if
-   * 'olderThan' is <= 0 then the filtering is disabled */
-  public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, 
Path directory, long olderThan)
-          throws IOException {
-    ArrayList<LocatedFileStatus> fstats = new ArrayList<>();
+    /** list files sorted by modification time that have not been modified 
since 'olderThan'. if
+     * 'olderThan' is <= 0 then the filtering is disabled */
+    public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, 
Path directory, long olderThan)
+        throws IOException {
+        ArrayList<LocatedFileStatus> fstats = new ArrayList<>();
 
-    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false);
-    while( itr.hasNext() ) {
-      LocatedFileStatus fileStatus = itr.next();
-      if(olderThan>0) {
-        if( fileStatus.getModificationTime()<=olderThan )
-          fstats.add(fileStatus);
-      }
-      else {
-        fstats.add(fileStatus);
-      }
-    }
-    Collections.sort(fstats, new ModifTimeComparator() );
+        RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false);
+        while (itr.hasNext()) {
+            LocatedFileStatus fileStatus = itr.next();
+            if (olderThan > 0) {
+                if (fileStatus.getModificationTime() <= olderThan) {
+                    fstats.add(fileStatus);
+                }
+            } else {
+                fstats.add(fileStatus);
+            }
+        }
+        Collections.sort(fstats, new ModifTimeComparator());
 
-    ArrayList<Path> result = new ArrayList<>(fstats.size());
-    for (LocatedFileStatus fstat : fstats) {
-      result.add(fstat.getPath());
+        ArrayList<Path> result = new ArrayList<>(fstats.size());
+        for (LocatedFileStatus fstat : fstats) {
+            result.add(fstat.getPath());
+        }
+        return result;
     }
-    return result;
-  }
 
-  /**
-   * Returns null if file already exists. throws if there was unexpected 
problem
-   */
-  public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) 
throws IOException {
-    try {
-      FSDataOutputStream os = fs.create(file, false);
-      return os;
-    } catch (FileAlreadyExistsException e) {
-      return null;
-    } catch (RemoteException e) {
-      if( e.unwrapRemoteException() instanceof AlreadyBeingCreatedException ) {
-        return null;
-      } else { // unexpected error
-        throw e;
-      }
+    /**
+     * Returns null if file already exists. throws if there was unexpected 
problem
+     */
+    public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) 
throws IOException {
+        try {
+            FSDataOutputStream os = fs.create(file, false);
+            return os;
+        } catch (FileAlreadyExistsException e) {
+            return null;
+        } catch (RemoteException e) {
+            if (e.unwrapRemoteException() instanceof 
AlreadyBeingCreatedException) {
+                return null;
+            } else { // unexpected error
+                throw e;
+            }
+        }
     }
-  }
 
-  public static class Pair<K,V> {
-    private K key;
-    private V value;
-    public Pair(K key, V value) {
-      this.key = key;
-      this.value = value;
-    }
+    public static class Pair<K, V> {
+        private K key;
+        private V value;
 
-    public K getKey() {
-      return key;
-    }
+        public Pair(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
 
-    public V getValue() {
-      return value;
-    }
+        public static <K, V> Pair of(K key, V value) {
+            return new Pair(key, value);
+        }
 
-    public static <K,V> Pair of(K key, V value) {
-      return new Pair(key,value);
-    }
-  }  // class Pair
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+    }  // class Pair
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
index 0558b3f..47ebdfe 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
@@ -1,32 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.common;
 
-import org.apache.hadoop.fs.FileStatus;
-
 import java.util.Comparator;
+import org.apache.hadoop.fs.FileStatus;
 
 
 public class ModifTimeComparator
-        implements Comparator<FileStatus> {
-   @Override
+    implements Comparator<FileStatus> {
+    @Override
     public int compare(FileStatus o1, FileStatus o2) {
-      return new Long(o1.getModificationTime()).compareTo( 
o2.getModificationTime() );
+        return new 
Long(o1.getModificationTime()).compareTo(o2.getModificationTime());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
index fd50496..3137f48 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.common;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
index 6cf0fbd..9f79373 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.hdfs.common;
 
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.hdfs.common;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
 
-public interface Partitioner extends Serializable{
+public interface Partitioner extends Serializable {
 
     /**
      * Return a relative path that the tuple should be written to. For 
example, if an HdfsBolt were configured to write

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
index ec78fd6..d0507b8 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
@@ -1,22 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.common;
 
+import java.io.IOException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.storm.hdfs.bolt.format.SequenceFormat;
@@ -25,9 +21,7 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-public class SequenceFileWriter extends AbstractHDFSWriter{
+public class SequenceFileWriter extends AbstractHDFSWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SequenceFileWriter.class);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java
index 585307d..0e8e1dd 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java
@@ -15,21 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.common.rotation;
 
+import java.io.IOException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class MoveFileAction implements RotationAction {
     private static final Logger LOG = 
LoggerFactory.getLogger(MoveFileAction.class);
 
     private String destination;
 
-    public MoveFileAction toDestination(String destDir){
+    public MoveFileAction toDestination(String destDir) {
         destination = destDir;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
index b15c314..f5ade03 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
@@ -15,14 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.hdfs.common.rotation;
 
+package org.apache.storm.hdfs.common.rotation;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.io.Serializable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 public interface RotationAction extends Serializable {
     void execute(FileSystem fileSystem, Path filePath) throws IOException;

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
index e1339df..6d5537b 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
@@ -24,37 +18,41 @@ import org.apache.hadoop.fs.Path;
 
 abstract class AbstractFileReader implements FileReader {
 
-  private final Path file;
+    private final Path file;
 
-  public AbstractFileReader(FileSystem fs, Path file) {
-    if (fs == null ) {
-      throw new IllegalArgumentException("filesystem arg cannot be null for 
reader");
+    public AbstractFileReader(FileSystem fs, Path file) {
+        if (fs == null) {
+            throw new IllegalArgumentException("filesystem arg cannot be null 
for reader");
+        }
+        if (file == null) {
+            throw new IllegalArgumentException("file arg cannot be null for 
reader");
+        }
+        this.file = file;
     }
-    if (file == null ) {
-      throw new IllegalArgumentException("file arg cannot be null for reader");
-    }
-    this.file = file;
-  }
 
-  @Override
-  public Path getFilePath() {
-    return file;
-  }
+    @Override
+    public Path getFilePath() {
+        return file;
+    }
 
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) { return true; }
-    if (o == null || getClass() != o.getClass()) { return false; }
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
-    AbstractFileReader that = (AbstractFileReader) o;
+        AbstractFileReader that = (AbstractFileReader) o;
 
-    return !(file != null ? !file.equals(that.file) : that.file != null);
-  }
+        return !(file != null ? !file.equals(that.file) : that.file != null);
+    }
 
-  @Override
-  public int hashCode() {
-    return file != null ? file.hashCode() : 0;
-  }
+    @Override
+    public int hashCode() {
+        return file != null ? file.hashCode() : 0;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index cb2607a..f94b8e5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
@@ -29,13 +23,6 @@ import org.apache.storm.validation.NotConf;
 import org.apache.storm.validation.Validated;
 
 public class Configs implements Validated {
-    public static class ReaderTypeValidator extends Validator {
-        @Override
-        public void validateField(String name, Object o) {
-            HdfsSpout.checkValidReader((String)o);
-        }
-    }
-    
     /**
      * @deprecated please use {@link HdfsSpout.setReaderType(String)}
      */
@@ -45,7 +32,6 @@ public class Configs implements Validated {
     public static final String READER_TYPE = "hdfsspout.reader.type";        
// Required - chose the file type being consumed
     public static final String TEXT = "text";
     public static final String SEQ = "seq";
-    
     /**
      * @deprecated please use {@link HdfsSpout#setHdfsUri(String)}
      */
@@ -81,7 +67,7 @@ public class Configs implements Validated {
      */
     @Deprecated
     @isInteger
-    @isPositiveNumber(includeZero=true)
+    @isPositiveNumber(includeZero = true)
     public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  
// commit after N records. 0 disables this.
     /**
      * @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)}
@@ -95,7 +81,7 @@ public class Configs implements Validated {
      */
     @Deprecated
     @isInteger
-    @isPositiveNumber(includeZero=true)
+    @isPositiveNumber(includeZero = true)
     public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
     /**
      * @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)}
@@ -103,27 +89,34 @@ public class Configs implements Validated {
     @Deprecated
     @isInteger
     @isPositiveNumber
-    public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";   
// inactivity duration after which locks are considered candidates for being 
reassigned to another spout
+    public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";
     /**
      * @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)}
      */
     @Deprecated
     @isBoolean
     public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";     
// if clocks on machines in the Storm cluster are in sync
+        // inactivity duration after which locks are considered candidates for 
being reassigned to another spout
     /**
      * @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)}
      */
     @Deprecated
     @isString
-    public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";     
// filenames with this suffix in archive dir will be ignored by the Spout
-
+    public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";
     @NotConf
     public static final String DEFAULT_LOCK_DIR = ".lock";
+        // filenames with this suffix in archive dir will be ignored by the 
Spout
     public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
     public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
     public static final int DEFAULT_MAX_OUTSTANDING = 10000;
     public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
-    
     @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
+
+    public static class ReaderTypeValidator extends Validator {
+        @Override
+        public void validateField(String name, Object o) {
+            HdfsSpout.checkValidReader((String) o);
+        }
+    }
 } // class Configs

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 25a136c..eea23e1 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -1,23 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
+import java.io.IOException;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,108 +21,107 @@ import org.apache.storm.hdfs.common.HdfsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * Facility to synchronize access to HDFS directory. The lock itself is 
represented
  * as a file in the same directory. Relies on atomic file creation.
  */
 public class DirLock {
-  private FileSystem fs;
-  private final Path lockFile;
-  public static final String DIR_LOCK_FILE = "DIRLOCK";
-  private static final Logger LOG = LoggerFactory.getLogger(DirLock.class);
-  private DirLock(FileSystem fs, Path lockFile) throws IOException {
-    if( fs.isDirectory(lockFile) ) {
-      throw new IllegalArgumentException(lockFile.toString() + " is not a 
directory");
-    }
-    this.fs = fs;
-    this.lockFile = lockFile;
-  }
-
-  /** Get a lock on file if not already locked
-   *
-   * @param fs
-   * @param dir  the dir on which to get a lock
-   * @return The lock object if it the lock was acquired. Returns null if the 
dir is already locked.
-   * @throws IOException if there were errors
-   */
-  public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
-    Path lockFile = getDirLockFile(dir);
+    public static final String DIR_LOCK_FILE = "DIRLOCK";
+    private static final Logger LOG = LoggerFactory.getLogger(DirLock.class);
+    private final Path lockFile;
+    private FileSystem fs;
 
-    try {
-      FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
-      if (ostream!=null) {
-        LOG.debug("Thread ({}) Acquired lock on dir {}", threadInfo(), dir);
-        ostream.close();
-        return new DirLock(fs, lockFile);
-      } else {
-        LOG.debug("Thread ({}) cannot lock dir {} as its already locked.", 
threadInfo(), dir);
-        return null;
-      }
-    } catch (IOException e) {
-        LOG.error("Error when acquiring lock on dir " + dir, e);
-        throw e;
+    private DirLock(FileSystem fs, Path lockFile) throws IOException {
+        if (fs.isDirectory(lockFile)) {
+            throw new IllegalArgumentException(lockFile.toString() + " is not 
a directory");
+        }
+        this.fs = fs;
+        this.lockFile = lockFile;
     }
-  }
 
-  private static Path getDirLockFile(Path dir) {
-    return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
-  }
+    /** Get a lock on file if not already locked
+     *
+     * @param fs
+     * @param dir  the dir on which to get a lock
+     * @return The lock object if it the lock was acquired. Returns null if 
the dir is already locked.
+     * @throws IOException if there were errors
+     */
+    public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
+        Path lockFile = getDirLockFile(dir);
 
-  private static String threadInfo () {
-    return "ThdId=" + Thread.currentThread().getId() + ", ThdName="
-            + Thread.currentThread().getName();
-  }
+        try {
+            FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
+            if (ostream != null) {
+                LOG.debug("Thread ({}) Acquired lock on dir {}", threadInfo(), 
dir);
+                ostream.close();
+                return new DirLock(fs, lockFile);
+            } else {
+                LOG.debug("Thread ({}) cannot lock dir {} as its already 
locked.", threadInfo(), dir);
+                return null;
+            }
+        } catch (IOException e) {
+            LOG.error("Error when acquiring lock on dir " + dir, e);
+            throw e;
+        }
+    }
 
-  /** Release lock on dir by deleting the lock file */
-  public void release() throws IOException {
-    if(!fs.delete(lockFile, false)) {
-      LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), 
lockFile);
+    private static Path getDirLockFile(Path dir) {
+        return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE);
     }
-    else {
-      LOG.debug("Thread {} Released dir lock {} ", threadInfo(), lockFile);
+
+    private static String threadInfo() {
+        return "ThdId=" + Thread.currentThread().getId() + ", ThdName="
+               + Thread.currentThread().getName();
     }
-  }
 
-  /** if the lock on the directory is stale, take ownership */
-  public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, 
int lockTimeoutSec) {
-    Path dirLockFile = getDirLockFile(dirToLock);
+    /** if the lock on the directory is stale, take ownership */
+    public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, 
int lockTimeoutSec) {
+        Path dirLockFile = getDirLockFile(dirToLock);
 
-    long now =  System.currentTimeMillis();
-    long expiryTime = now - (lockTimeoutSec*1000);
+        long now = System.currentTimeMillis();
+        long expiryTime = now - (lockTimeoutSec * 1000);
 
-    try {
-      long modTime = fs.getFileStatus(dirLockFile).getModificationTime();
-      if(modTime <= expiryTime) {
-        return takeOwnership(fs, dirLockFile);
-      }
-      return null;
-    } catch (IOException e)  {
-      return  null;
+        try {
+            long modTime = fs.getFileStatus(dirLockFile).getModificationTime();
+            if (modTime <= expiryTime) {
+                return takeOwnership(fs, dirLockFile);
+            }
+            return null;
+        } catch (IOException e) {
+            return null;
+        }
     }
-  }
 
-  private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws 
IOException {
-    if(fs instanceof DistributedFileSystem) {
-      if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) {
-        LOG.warn("Unable to recover lease on dir lock file " + dirLockFile + " 
right now. Cannot transfer ownership. Will need to try later.");
+    private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) 
throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+            if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) {
+                LOG.warn("Unable to recover lease on dir lock file " + 
dirLockFile +
+                         " right now. Cannot transfer ownership. Will need to 
try later.");
+                return null;
+            }
+        }
+
+        // delete and recreate lock file
+        if (fs.delete(dirLockFile, false)) { // returns false if somebody else 
already deleted it (to take ownership)
+            FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, 
dirLockFile);
+            if (ostream != null) {
+                ostream.close();
+            }
+            return new DirLock(fs, dirLockFile);
+        }
         return null;
-      }
     }
 
-    // delete and recreate lock file
-    if( fs.delete(dirLockFile, false) ) { // returns false if somebody else 
already deleted it (to take ownership)
-      FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile);
-      if(ostream!=null) {
-        ostream.close();
-      }
-      return new DirLock(fs, dirLockFile);
+    /** Release lock on dir by deleting the lock file */
+    public void release() throws IOException {
+        if (!fs.delete(lockFile, false)) {
+            LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), 
lockFile);
+        } else {
+            LOG.debug("Thread {} Released dir lock {} ", threadInfo(), 
lockFile);
+        }
     }
-    return null;
-  }
 
-  public Path getLockFile() {
-    return lockFile;
-  }
+    public Path getLockFile() {
+        return lockFile;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index a7cb2b8..c5a2f55 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -1,24 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collection;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,11 +28,6 @@ import org.apache.storm.hdfs.common.HdfsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Collection;
-
 /**
  * Facility to synchronize access to HDFS files. Thread gains exclusive access 
to a file by acquiring
  * a FileLock object. The lock itself is represented as file on HDFS. Relies 
on atomic file creation.
@@ -43,291 +36,303 @@ import java.util.Collection;
  */
 public class FileLock {
 
-  private final FileSystem fs;
-  private final String componentID;
-  private final Path lockFile;
-  private final FSDataOutputStream lockFileStream;
-  private LogEntry lastEntry;
-
-  private static final Logger LOG = LoggerFactory.getLogger(FileLock.class);
-
-  private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream 
lockFileStream, String spoutId)
-          throws IOException {
-    this.fs = fs;
-    this.lockFile = lockFile;
-    this.lockFileStream = lockFileStream;
-    this.componentID = spoutId;
-    logProgress("0", false);
-  }
-
-  private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry 
entry)
-          throws IOException {
-    this.fs = fs;
-    this.lockFile = lockFile;
-    this.lockFileStream =  fs.append(lockFile);
-    this.componentID = spoutId;
-    LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId);
-    logProgress(entry.fileOffset, true);
-  }
-
-  public void heartbeat(String fileOffset) throws IOException {
-    logProgress(fileOffset, true);
-  }
-
-  // new line is at beginning of each line (instead of end) for better 
recovery from
-  // partial writes of prior lines
-  private void logProgress(String fileOffset, boolean prefixNewLine)
-          throws IOException {
-    long now = System.currentTimeMillis();
-    LogEntry entry = new LogEntry(now, componentID, fileOffset);
-    String line = entry.toString();
-    if(prefixNewLine) {
-      lockFileStream.writeBytes(System.lineSeparator() + line);
+    private static final Logger LOG = LoggerFactory.getLogger(FileLock.class);
+    private final FileSystem fs;
+    private final String componentID;
+    private final Path lockFile;
+    private final FSDataOutputStream lockFileStream;
+    private LogEntry lastEntry;
+
+    private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream 
lockFileStream, String spoutId)
+        throws IOException {
+        this.fs = fs;
+        this.lockFile = lockFile;
+        this.lockFileStream = lockFileStream;
+        this.componentID = spoutId;
+        logProgress("0", false);
     }
-    else {
-      lockFileStream.writeBytes(line);
+
+    private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry 
entry)
+        throws IOException {
+        this.fs = fs;
+        this.lockFile = lockFile;
+        this.lockFileStream = fs.append(lockFile);
+        this.componentID = spoutId;
+        LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, 
spoutId);
+        logProgress(entry.fileOffset, true);
     }
-    lockFileStream.hflush();
-
-    lastEntry = entry; // update this only after writing to hdfs
-  }
-
-  /** Release lock by deleting file
-   * @throws IOException if lock file could not be deleted
-   */
-  public void release() throws IOException {
-    lockFileStream.close();
-    if(!fs.delete(lockFile, false)) {
-      LOG.warn("Unable to delete lock file, Spout = {}", componentID);
-      throw new IOException("Unable to delete lock file");
+
+    /** returns lock on file or null if file is already locked. throws if 
unexpected problem */
+    public static FileLock tryLock(FileSystem fs, Path fileToLock, Path 
lockDirPath, String spoutId)
+        throws IOException {
+        Path lockFile = new Path(lockDirPath, fileToLock.getName());
+
+        try {
+            FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
+            if (ostream != null) {
+                LOG.debug("Acquired lock on file {}. LockFile= {}, Spout = 
{}", fileToLock, lockFile, spoutId);
+                return new FileLock(fs, lockFile, ostream, spoutId);
+            } else {
+                LOG.debug("Cannot lock file {} as its already locked. Spout = 
{}", fileToLock, spoutId);
+                return null;
+            }
+        } catch (IOException e) {
+            LOG.error("Error when acquiring lock on file " + fileToLock + " 
Spout = " + spoutId, e);
+            throw e;
+        }
     }
-    LOG.debug("Released lock file {}. Spout {}", lockFile, componentID);
-  }
-
-  // For testing only.. invoked via reflection
-  private void forceCloseLockFile() throws IOException {
-    lockFileStream.close();
-  }
-
-  /** returns lock on file or null if file is already locked. throws if 
unexpected problem */
-  public static FileLock tryLock(FileSystem fs, Path fileToLock, Path 
lockDirPath, String spoutId)
-          throws IOException {
-    Path lockFile = new Path(lockDirPath, fileToLock.getName());
-
-    try {
-      FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
-      if (ostream != null) {
-        LOG.debug("Acquired lock on file {}. LockFile= {}, Spout = {}", 
fileToLock, lockFile, spoutId);
-        return new FileLock(fs, lockFile, ostream, spoutId);
-      } else {
-        LOG.debug("Cannot lock file {} as its already locked. Spout = {}", 
fileToLock, spoutId);
+
+    /**
+     * checks if lockFile is older than 'olderThan' UTC time by examining the 
modification time
+     * on file and (if necessary) the timestamp in last log entry in the file. 
If its stale, then
+     * returns the last log entry, else returns null.
+     * @param fs
+     * @param lockFile
+     * @param olderThan  time (millis) in UTC.
+     * @return the last entry in the file if its too old. null if last entry 
is not too old
+     * @throws IOException
+     */
+    public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, 
long olderThan)
+        throws IOException {
+        long modifiedTime = fs.getFileStatus(lockFile).getModificationTime();
+        if (modifiedTime <= olderThan) { // look
+            //Impt: HDFS timestamp may not reflect recent appends, so we 
double check the
+            // timestamp in last line of file to see when the last update was 
made
+            LogEntry lastEntry = getLastEntry(fs, lockFile);
+            if (lastEntry == null) {
+                LOG.warn("Empty lock file found. Deleting it. {}", lockFile);
+                try {
+                    if (!fs.delete(lockFile, false)) {
+                        throw new IOException("Empty lock file deletion 
failed");
+                    }
+                } catch (Exception e) {
+                    LOG.error("Unable to delete empty lock file " + lockFile, 
e);
+                }
+            }
+            if (lastEntry.eventTime <= olderThan) {
+                return lastEntry;
+            }
+        }
         return null;
-      }
-    } catch (IOException e) {
-      LOG.error("Error when acquiring lock on file " + fileToLock + " Spout = 
" + spoutId, e);
-      throw e;
     }
-  }
-
-  /**
-   * checks if lockFile is older than 'olderThan' UTC time by examining the 
modification time
-   * on file and (if necessary) the timestamp in last log entry in the file. 
If its stale, then
-   * returns the last log entry, else returns null.
-   * @param fs
-   * @param lockFile
-   * @param olderThan  time (millis) in UTC.
-   * @return the last entry in the file if its too old. null if last entry is 
not too old
-   * @throws IOException
-   */
-  public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, 
long olderThan)
-          throws IOException {
-    long modifiedTime = fs.getFileStatus(lockFile).getModificationTime();
-    if( modifiedTime <= olderThan ) { // look
-      //Impt: HDFS timestamp may not reflect recent appends, so we double 
check the
-      // timestamp in last line of file to see when the last update was made
-      LogEntry lastEntry =  getLastEntry(fs, lockFile);
-      if(lastEntry==null) {
-        LOG.warn("Empty lock file found. Deleting it. {}", lockFile);
-        try {
-          if(!fs.delete(lockFile, false))
-            throw new IOException("Empty lock file deletion failed");
-        } catch (Exception e) {
-          LOG.error("Unable to delete empty lock file " + lockFile, e);
+
+    /**
+     * returns the last log entry
+     * @param fs
+     * @param lockFile
+     * @return
+     * @throws IOException
+     */
+    public static LogEntry getLastEntry(FileSystem fs, Path lockFile)
+        throws IOException {
+        FSDataInputStream in = fs.open(lockFile);
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+        String lastLine = null;
+        for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+            lastLine = line;
         }
-      }
-      if( lastEntry.eventTime <= olderThan )
-        return lastEntry;
+        return LogEntry.deserialize(lastLine);
     }
-    return null;
-  }
-
-  /**
-   * returns the last log entry
-   * @param fs
-   * @param lockFile
-   * @return
-   * @throws IOException
-   */
-  public static LogEntry getLastEntry(FileSystem fs, Path lockFile)
-          throws IOException {
-    FSDataInputStream in = fs.open(lockFile);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-    String lastLine = null;
-    for(String line = reader.readLine(); line!=null; line = reader.readLine() 
) {
-      lastLine=line;
+
+    /**
+     * Takes ownership of the lock file if possible.
+     * @param lockFile
+     * @param lastEntry   last entry in the lock file. this param is an 
optimization.
+     *                    we dont scan the lock file again to find its last 
entry here since
+     *                    its already been done once by the logic used to 
check if the lock
+     *                    file is stale. so this value comes from that earlier 
scan.
+     * @param spoutId     spout id
+     * @throws IOException if unable to acquire
+     * @return null if lock File is not recoverable
+     */
+    public static FileLock takeOwnership(FileSystem fs, Path lockFile, 
LogEntry lastEntry, String spoutId)
+        throws IOException {
+        try {
+            if (fs instanceof DistributedFileSystem) {
+                if (!((DistributedFileSystem) fs).recoverLease(lockFile)) {
+                    LOG.warn(
+                        "Unable to recover lease on lock file {} right now. 
Cannot transfer ownership. Will need to try later. Spout = {}",
+                        lockFile, spoutId);
+                    return null;
+                }
+            }
+            return new FileLock(fs, lockFile, spoutId, lastEntry);
+        } catch (IOException e) {
+            if (e instanceof RemoteException &&
+                ((RemoteException) e).unwrapRemoteException() instanceof 
AlreadyBeingCreatedException) {
+                LOG.warn(
+                    "Lock file " + lockFile + "is currently open. Cannot 
transfer ownership now. Will need to try later. Spout= " + spoutId,
+                    e);
+                return null;
+            } else { // unexpected error
+                LOG.warn("Cannot transfer ownership now for lock file " + 
lockFile + ". Will need to try later. Spout =" + spoutId, e);
+                throw e;
+            }
+        }
     }
-    return LogEntry.deserialize(lastLine);
-  }
-
-  /**
-   * Takes ownership of the lock file if possible.
-   * @param lockFile
-   * @param lastEntry   last entry in the lock file. this param is an 
optimization.
-   *                    we dont scan the lock file again to find its last 
entry here since
-   *                    its already been done once by the logic used to check 
if the lock
-   *                    file is stale. so this value comes from that earlier 
scan.
-   * @param spoutId     spout id
-   * @throws IOException if unable to acquire
-   * @return null if lock File is not recoverable
-   */
-  public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry 
lastEntry, String spoutId)
-          throws IOException {
-    try {
-      if(fs instanceof DistributedFileSystem ) {
-        if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) {
-          LOG.warn("Unable to recover lease on lock file {} right now. Cannot 
transfer ownership. Will need to try later. Spout = {}", lockFile, spoutId);
-          return null;
+
+    /**
+     * Finds a oldest expired lock file (using modification timestamp), then 
takes
+     * ownership of the lock file
+     * Impt: Assumes access to lockFilesDir has been externally synchronized 
such that
+     *       only one thread accessing the same thread
+     * @param fs
+     * @param lockFilesDir
+     * @param locktimeoutSec
+     * @return
+     */
+    public static FileLock acquireOldestExpiredLock(FileSystem fs, Path 
lockFilesDir, int locktimeoutSec, String spoutId)
+        throws IOException {
+        // list files
+        long now = System.currentTimeMillis();
+        long olderThan = now - (locktimeoutSec * 1000);
+        Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, 
lockFilesDir, olderThan);
+
+        // locate expired lock files (if any). Try to take ownership (oldest 
lock first)
+        for (Path file : listing) {
+            if (file.getName().equalsIgnoreCase(DirLock.DIR_LOCK_FILE)) {
+                continue;
+            }
+            LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
+            if (lastEntry != null) {
+                FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, 
spoutId);
+                if (lock != null) {
+                    return lock;
+                }
+            }
+        }
+        if (listing.isEmpty()) {
+            LOG.debug("No abandoned lock files found by Spout {}", spoutId);
         }
-      }
-      return new FileLock(fs, lockFile, spoutId, lastEntry);
-    } catch (IOException e) {
-      if (e instanceof RemoteException &&
-              ((RemoteException) e).unwrapRemoteException() instanceof 
AlreadyBeingCreatedException) {
-        LOG.warn("Lock file " + lockFile + "is currently open. Cannot transfer 
ownership now. Will need to try later. Spout= " + spoutId, e);
         return null;
-      } else { // unexpected error
-        LOG.warn("Cannot transfer ownership now for lock file " + lockFile + 
". Will need to try later. Spout =" + spoutId, e);
-        throw e;
-      }
     }
-  }
-
-  /**
-   * Finds a oldest expired lock file (using modification timestamp), then 
takes
-   * ownership of the lock file
-   * Impt: Assumes access to lockFilesDir has been externally synchronized 
such that
-   *       only one thread accessing the same thread
-   * @param fs
-   * @param lockFilesDir
-   * @param locktimeoutSec
-   * @return
-   */
-  public static FileLock acquireOldestExpiredLock(FileSystem fs, Path 
lockFilesDir, int locktimeoutSec, String spoutId)
-          throws IOException {
-    // list files
-    long now = System.currentTimeMillis();
-    long olderThan = now - (locktimeoutSec*1000);
-    Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, 
lockFilesDir, olderThan);
-
-    // locate expired lock files (if any). Try to take ownership (oldest lock 
first)
-    for (Path file : listing) {
-      if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) {
-        continue;
-      }
-      LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
-      if(lastEntry!=null) {
-        FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, spoutId);
-        if(lock!=null) {
-          return lock;
+
+    /**
+     * Finds oldest expired lock file (using modification timestamp), then 
takes
+     * ownership of the lock file
+     * Impt: Assumes access to lockFilesDir has been externally synchronized 
such that
+     *       only one thread accessing the same thread
+     * @param fs
+     * @param lockFilesDir
+     * @param locktimeoutSec
+     * @return a Pair<lock file path, last entry in lock file> .. if expired 
lock file found
+     * @throws IOException
+     */
+    public static HdfsUtils.Pair<Path, LogEntry> 
locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec)
+        throws IOException {
+        // list files
+        long now = System.currentTimeMillis();
+        long olderThan = now - (locktimeoutSec * 1000);
+        Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, 
lockFilesDir, olderThan);
+
+        // locate oldest expired lock file (if any) and take ownership
+        for (Path file : listing) {
+            if (file.getName().equalsIgnoreCase(DirLock.DIR_LOCK_FILE)) {
+                continue;
+            }
+            LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
+            if (lastEntry != null) {
+                return new HdfsUtils.Pair<>(file, lastEntry);
+            }
         }
-      }
+        LOG.debug("No abandoned files found");
+        return null;
     }
-    if(listing.isEmpty()) {
-      LOG.debug("No abandoned lock files found by Spout {}", spoutId);
+
+    public void heartbeat(String fileOffset) throws IOException {
+        logProgress(fileOffset, true);
     }
-    return null;
-  }
-
-
-  /**
-   * Finds oldest expired lock file (using modification timestamp), then takes
-   * ownership of the lock file
-   * Impt: Assumes access to lockFilesDir has been externally synchronized 
such that
-   *       only one thread accessing the same thread
-   * @param fs
-   * @param lockFilesDir
-   * @param locktimeoutSec
-   * @return a Pair<lock file path, last entry in lock file> .. if expired 
lock file found
-   * @throws IOException
-   */
-  public static HdfsUtils.Pair<Path,LogEntry> 
locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec)
-          throws IOException {
-    // list files
-    long now =  System.currentTimeMillis();
-    long olderThan = now - (locktimeoutSec*1000);
-    Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, 
lockFilesDir, olderThan);
-
-    // locate oldest expired lock file (if any) and take ownership
-    for (Path file : listing) {
-      if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) {
-        continue;
-      }
-      LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
-      if(lastEntry!=null) {
-        return new HdfsUtils.Pair<>(file, lastEntry);
-      }
+
+    // new line is at beginning of each line (instead of end) for better 
recovery from
+    // partial writes of prior lines
+    private void logProgress(String fileOffset, boolean prefixNewLine)
+        throws IOException {
+        long now = System.currentTimeMillis();
+        LogEntry entry = new LogEntry(now, componentID, fileOffset);
+        String line = entry.toString();
+        if (prefixNewLine) {
+            lockFileStream.writeBytes(System.lineSeparator() + line);
+        } else {
+            lockFileStream.writeBytes(line);
+        }
+        lockFileStream.hflush();
+
+        lastEntry = entry; // update this only after writing to hdfs
+    }
+
+    /** Release lock by deleting file
+     * @throws IOException if lock file could not be deleted
+     */
+    public void release() throws IOException {
+        lockFileStream.close();
+        if (!fs.delete(lockFile, false)) {
+            LOG.warn("Unable to delete lock file, Spout = {}", componentID);
+            throw new IOException("Unable to delete lock file");
+        }
+        LOG.debug("Released lock file {}. Spout {}", lockFile, componentID);
     }
-    LOG.debug("No abandoned files found");
-    return null;
-  }
-
-  public LogEntry getLastLogEntry() {
-    return lastEntry;
-  }
-
-  public Path getLockFile() {
-    return lockFile;
-  }
-
-  public static class LogEntry {
-    private static final int NUM_FIELDS = 3;
-    public final long eventTime;
-    public final String componentID;
-    public final String fileOffset;
-
-    public LogEntry(long eventtime, String componentID, String fileOffset) {
-      this.eventTime = eventtime;
-      this.componentID = componentID;
-      this.fileOffset = fileOffset;
+
+    // For testing only.. invoked via reflection
+    private void forceCloseLockFile() throws IOException {
+        lockFileStream.close();
     }
 
-    public String toString() {
-      return eventTime + "," + componentID + "," + fileOffset;
+    public LogEntry getLastLogEntry() {
+        return lastEntry;
     }
-    public static LogEntry deserialize(String line) {
-      String[] fields = line.split(",", NUM_FIELDS);
-      return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]);
+
+    public Path getLockFile() {
+        return lockFile;
     }
 
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) { return true; }
-      if (!(o instanceof LogEntry)) { return false; }
+    public static class LogEntry {
+        private static final int NUM_FIELDS = 3;
+        public final long eventTime;
+        public final String componentID;
+        public final String fileOffset;
 
-      LogEntry logEntry = (LogEntry) o;
+        public LogEntry(long eventtime, String componentID, String fileOffset) 
{
+            this.eventTime = eventtime;
+            this.componentID = componentID;
+            this.fileOffset = fileOffset;
+        }
 
-      if (eventTime != logEntry.eventTime) { return false; }
-      if (!componentID.equals(logEntry.componentID)) { return false; }
-      return fileOffset.equals(logEntry.fileOffset);
+        public static LogEntry deserialize(String line) {
+            String[] fields = line.split(",", NUM_FIELDS);
+            return new LogEntry(Long.parseLong(fields[0]), fields[1], 
fields[2]);
+        }
 
-    }
+        public String toString() {
+            return eventTime + "," + componentID + "," + fileOffset;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof LogEntry)) {
+                return false;
+            }
+
+            LogEntry logEntry = (LogEntry) o;
+
+            if (eventTime != logEntry.eventTime) {
+                return false;
+            }
+            if (!componentID.equals(logEntry.componentID)) {
+                return false;
+            }
+            return fileOffset.equals(logEntry.fileOffset);
 
-    @Override
-    public int hashCode() {
-      int result = (int) (eventTime ^ (eventTime >>> 32));
-      result = 31 * result + componentID.hashCode();
-      result = 31 * result + fileOffset.hashCode();
-      return result;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (eventTime ^ (eventTime >>> 32));
+            result = 31 * result + componentID.hashCode();
+            result = 31 * result + fileOffset.hashCode();
+            return result;
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
index 78296b9..bf58815 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
@@ -30,7 +24,8 @@ package org.apache.storm.hdfs.spout;
  */
 
 interface FileOffset extends Comparable<FileOffset>, Cloneable {
-  /** tests if rhs == currOffset+1 */
-  boolean isNextOffset(FileOffset rhs);
-  FileOffset clone();
+    /** tests if rhs == currOffset+1 */
+    boolean isNextOffset(FileOffset rhs);
+
+    FileOffset clone();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
index 54a90d4..49d998a 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -1,44 +1,37 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
-import org.apache.hadoop.fs.Path;
-
 import java.io.IOException;
 import java.util.List;
+import org.apache.hadoop.fs.Path;
 
 interface FileReader {
-  Path getFilePath();
+    Path getFilePath();
 
-  /**
-   * A simple numeric value may not be sufficient for certain formats 
consequently
-   * this is a String.
-   */
-  FileOffset getFileOffset();
+    /**
+     * A simple numeric value may not be sufficient for certain formats 
consequently
+     * this is a String.
+     */
+    FileOffset getFileOffset();
 
-  /**
-   * Get the next tuple from the file
-   *
-   * @return null if no more data
-   * @throws IOException
-   */
-  List<Object> next() throws IOException, ParseException;
+    /**
+     * Get the next tuple from the file
+     *
+     * @return null if no more data
+     * @throws IOException
+     */
+    List<Object> next() throws IOException, ParseException;
 
-  void close();
+    void close();
 }

Reply via email to