http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 67ccde6..39a151f 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -1,32 +1,27 @@
-
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.spout;
 
-import org.apache.storm.Config;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.storm.hdfs.common.HdfsUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,46 +29,41 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.junit.Before;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.storm.Config;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.apache.storm.hdfs.common.HdfsUtils.Pair;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.hdfs.common.HdfsUtils.Pair;
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
-import org.junit.ClassRule;
-
 public class TestHdfsSpout {
 
+    private static final Configuration conf = new Configuration();
     @ClassRule
     public static MiniDFSClusterRule DFS_CLUSTER_RULE = new 
MiniDFSClusterRule();
+    private static DistributedFileSystem fs;
     @Rule
     public TemporaryFolder tempFolder = new TemporaryFolder();
     public File baseFolder;
-
     private Path source;
     private Path archive;
     private Path badfiles;
 
-    private static DistributedFileSystem fs;
-    private static final Configuration conf = new Configuration();
-
     @BeforeClass
     public static void setupClass() throws IOException {
         fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
@@ -84,6 +74,50 @@ public class TestHdfsSpout {
         fs.close();
     }
 
+    private static <T> T getField(HdfsSpout spout, String fieldName) throws 
NoSuchFieldException, IllegalAccessException {
+        Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
+        readerFld.setAccessible(true);
+        return (T) readerFld.get(spout);
+    }
+
+    private static boolean getBoolField(HdfsSpout spout, String fieldName) 
throws NoSuchFieldException, IllegalAccessException {
+        Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
+        readerFld.setAccessible(true);
+        return readerFld.getBoolean(spout);
+    }
+
+    private static List<String> readTextFile(FileSystem fs, String f) throws 
IOException {
+        Path file = new Path(f);
+        FSDataInputStream x = fs.open(file);
+        BufferedReader reader = new BufferedReader(new InputStreamReader(x));
+        String line = null;
+        ArrayList<String> result = new ArrayList<>();
+        while ((line = reader.readLine()) != null) {
+            result.add(line);
+        }
+        return result;
+    }
+
+    private static void createSeqFile(FileSystem fs, Path file, int rowCount) 
throws IOException {
+
+        Configuration conf = new Configuration();
+        try {
+            if (fs.exists(file)) {
+                fs.delete(file, false);
+            }
+
+            SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, 
IntWritable.class, Text.class);
+            for (int i = 0; i < rowCount; i++) {
+                w.append(new IntWritable(i), new Text("line " + i));
+            }
+            w.close();
+            System.out.println("done");
+        } catch (IOException e) {
+            e.printStackTrace();
+
+        }
+    }
+
     @Before
     public void setup() throws Exception {
         baseFolder = tempFolder.newFolder("hdfsspout");
@@ -395,18 +429,6 @@ public class TestHdfsSpout {
         }
     }
 
-    private static <T> T getField(HdfsSpout spout, String fieldName) throws 
NoSuchFieldException, IllegalAccessException {
-        Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
-        readerFld.setAccessible(true);
-        return (T) readerFld.get(spout);
-    }
-
-    private static boolean getBoolField(HdfsSpout spout, String fieldName) 
throws NoSuchFieldException, IllegalAccessException {
-        Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
-        readerFld.setAccessible(true);
-        return readerFld.getBoolean(spout);
-    }
-
     @Test
     public void testSimpleSequenceFile() throws Exception {
         //1) create a couple files to consume
@@ -450,13 +472,14 @@ public class TestHdfsSpout {
         Assert.assertEquals(2, listDir(source).size());
 
         // 2) run spout
-        try (AutoCloseableHdfsSpout closeableSpout = 
makeSpout(MockTextFailingReader.class.getName(), 
MockTextFailingReader.defaultFields)) {
+        try (
+            AutoCloseableHdfsSpout closeableSpout = 
makeSpout(MockTextFailingReader.class.getName(), 
MockTextFailingReader.defaultFields)) {
             HdfsSpout spout = closeableSpout.spout;
             Map<String, Object> conf = getCommonConfigs();
             openSpout(spout, 0, conf);
 
             List<String> res = runSpout(spout, "r11");
-            String[] expected = new String[]{"[line 0]", "[line 1]", "[line 
2]", "[line 0]", "[line 1]", "[line 2]"};
+            String[] expected = new String[]{ "[line 0]", "[line 1]", "[line 
2]", "[line 0]", "[line 1]", "[line 2]" };
             Assert.assertArrayEquals(expected, res.toArray());
 
             // 3) make sure 6 lines (3 from each file) were read in all
@@ -576,18 +599,6 @@ public class TestHdfsSpout {
         }
     }
 
-    private static List<String> readTextFile(FileSystem fs, String f) throws 
IOException {
-        Path file = new Path(f);
-        FSDataInputStream x = fs.open(file);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(x));
-        String line = null;
-        ArrayList<String> result = new ArrayList<>();
-        while ((line = reader.readLine()) != null) {
-            result.add(line);
-        }
-        return result;
-    }
-
     private Map<String, Object> getCommonConfigs() {
         Map<String, Object> topoConf = new HashMap<>();
         topoConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
@@ -596,29 +607,15 @@ public class TestHdfsSpout {
 
     private AutoCloseableHdfsSpout makeSpout(String readerType, String[] 
outputFields) {
         HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields)
-            .setReaderType(readerType)
-            .setHdfsUri(DFS_CLUSTER_RULE.getDfscluster().getURI().toString())
-            .setSourceDir(source.toString())
-            .setArchiveDir(archive.toString())
-            .setBadFilesDir(badfiles.toString());
+                                         .setReaderType(readerType)
+                                         
.setHdfsUri(DFS_CLUSTER_RULE.getDfscluster().getURI().toString())
+                                         .setSourceDir(source.toString())
+                                         .setArchiveDir(archive.toString())
+                                         .setBadFilesDir(badfiles.toString());
 
         return new AutoCloseableHdfsSpout(spout);
     }
 
-    private static class AutoCloseableHdfsSpout implements AutoCloseable {
-
-        private final HdfsSpout spout;
-
-        public AutoCloseableHdfsSpout(HdfsSpout spout) {
-            this.spout = spout;
-        }
-
-        @Override
-        public void close() throws Exception {
-            spout.close();
-        }
-    }
-
     private void openSpout(HdfsSpout spout, int spoutId, Map<String, Object> 
topoConf) {
         MockCollector collector = new MockCollector();
         spout.open(topoConf, new MockTopologyContext(spoutId, topoConf), 
collector);
@@ -665,23 +662,17 @@ public class TestHdfsSpout {
         os.close();
     }
 
-    private static void createSeqFile(FileSystem fs, Path file, int rowCount) 
throws IOException {
+    private static class AutoCloseableHdfsSpout implements AutoCloseable {
 
-        Configuration conf = new Configuration();
-        try {
-            if (fs.exists(file)) {
-                fs.delete(file, false);
-            }
+        private final HdfsSpout spout;
 
-            SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, 
IntWritable.class, Text.class);
-            for (int i = 0; i < rowCount; i++) {
-                w.append(new IntWritable(i), new Text("line " + i));
-            }
-            w.close();
-            System.out.println("done");
-        } catch (IOException e) {
-            e.printStackTrace();
+        public AutoCloseableHdfsSpout(HdfsSpout spout) {
+            this.spout = spout;
+        }
 
+        @Override
+        public void close() throws Exception {
+            spout.close();
         }
     }
 
@@ -729,7 +720,7 @@ public class TestHdfsSpout {
     // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each 
file read
     static class MockTextFailingReader extends TextFileReader {
 
-        public static final String[] defaultFields = {"line"};
+        public static final String[] defaultFields = { "line" };
         int readAttempts = 0;
 
         public MockTextFailingReader(FileSystem fs, Path file, Map<String, 
Object> conf) throws IOException {
@@ -753,7 +744,11 @@ public class TestHdfsSpout {
         private final int componentId;
 
         public MockTopologyContext(int componentId, Map<String, Object> 
topoConf) {
-            // StormTopology topology, Map<String, Object> topoConf, 
Map<Integer, String> taskToComponent, Map<String, List<Integer>> 
componentToSortedTasks, Map<String, Map<String, Fields>> 
componentToStreamToFields, String stormId, String codeDir, String pidDir, 
Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, 
Object> defaultResources, Map<String, Object> userResources, Map<String, 
Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> 
registeredMetrics, Atom openOrPrepareWasCalled
+            // StormTopology topology, Map<String, Object> topoConf, 
Map<Integer, String> taskToComponent, Map<String, List<Integer>>
+            // componentToSortedTasks, Map<String, Map<String, Fields>> 
componentToStreamToFields, String stormId, String codeDir, String
+            // pidDir, Integer taskId, Integer workerPort, List<Integer> 
workerTasks, Map<String, Object> defaultResources, Map<String,
+            // Object> userResources, Map<String, Object> executorData, 
Map<Integer, Map<Integer, Map<String, IMetric>>>
+            // registeredMetrics, Atom openOrPrepareWasCalled
             super(null, topoConf, null, null, null, null, null, null, null, 
null, null, null, null, null, null, null, null);
             this.componentId = componentId;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
index c631838..760e099 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
@@ -1,12 +1,7 @@
-
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
@@ -17,6 +12,8 @@
 
 package org.apache.storm.hdfs.spout;
 
+import java.io.File;
+import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -27,17 +24,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.io.IOException;
-
 public class TestProgressTracker {
 
-    private FileSystem fs;
-    private Configuration conf = new Configuration();
-
     @Rule
     public TemporaryFolder tempFolder = new TemporaryFolder();
     public File baseFolder;
+    private FileSystem fs;
+    private Configuration conf = new Configuration();
 
     @Before
     public void setUp() throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
index 1d1f6c7..b94fb53 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
@@ -26,7 +26,7 @@ import org.junit.runners.model.Statement;
 public class MiniDFSClusterRule implements TestRule {
 
     private static final String TEST_BUILD_DATA = "test.build.data";
-    
+
     private final Supplier<Configuration> hadoopConfSupplier;
     private Configuration hadoopConf;
     private MiniDFSCluster dfscluster;
@@ -34,11 +34,11 @@ public class MiniDFSClusterRule implements TestRule {
     public MiniDFSClusterRule() {
         this(() -> new Configuration());
     }
-    
+
     public MiniDFSClusterRule(Supplier<Configuration> hadoopConfSupplier) {
         this.hadoopConfSupplier = hadoopConfSupplier;
     }
-    
+
     public Configuration getHadoopConf() {
         return hadoopConf;
     }
@@ -46,7 +46,7 @@ public class MiniDFSClusterRule implements TestRule {
     public MiniDFSCluster getDfscluster() {
         return dfscluster;
     }
-    
+
     @Override
     public Statement apply(Statement base, Description description) {
         return new Statement() {

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
index e018016..3d7218c 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
@@ -1,34 +1,16 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.hdfs.trident;
 
-import org.apache.storm.Config;
-import org.apache.storm.tuple.Fields;
-import org.apache.commons.io.FileUtils;
-import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
-import org.apache.storm.hdfs.trident.format.FileNameFormat;
-import org.apache.storm.hdfs.trident.format.RecordFormat;
-import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.storm.trident.tuple.TridentTuple;
+package org.apache.storm.hdfs.trident;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,6 +23,18 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -56,30 +50,6 @@ public class HdfsStateTest {
     private static final String INDEX_FILE_PREFIX = ".index.";
     private final TestFileNameFormat fileNameFormat = new TestFileNameFormat();
 
-    private static class TestFileNameFormat implements FileNameFormat {
-        private String currentFileName = "";
-
-        @Override
-        public void prepare(Map<String, Object> conf, int partitionIndex, int 
numPartitions) {
-
-        }
-
-        @Override
-        public String getName(long rotation, long timeStamp) {
-            currentFileName = FILE_NAME_PREFIX + Long.toString(rotation);
-            return currentFileName;
-        }
-
-        @Override
-        public String getPath() {
-            return TEST_OUT_DIR;
-        }
-
-        public String getCurrentFileName() {
-            return currentFileName;
-        }
-    }
-
     private HdfsState createHdfsState() {
 
         Fields hdfsFields = new Fields("f1");
@@ -89,10 +59,10 @@ public class HdfsStateTest {
         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, 
FileSizeRotationPolicy.Units.MB);
 
         HdfsState.Options options = new HdfsState.HdfsFileOptions()
-                .withFileNameFormat(fileNameFormat)
-                .withRecordFormat(recordFormat)
-                .withRotationPolicy(rotationPolicy)
-                .withFsUrl("file://" + TEST_OUT_DIR);
+            .withFileNameFormat(fileNameFormat)
+            .withRecordFormat(recordFormat)
+            .withRotationPolicy(rotationPolicy)
+            .withFsUrl("file://" + TEST_OUT_DIR);
 
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.TOPOLOGY_NAME, TEST_TOPOLOGY_NAME);
@@ -123,7 +93,6 @@ public class HdfsStateTest {
         FileUtils.deleteQuietly(new File(TEST_OUT_DIR));
     }
 
-
     @Test
     public void testPrepare() throws Exception {
         HdfsState state = createHdfsState();
@@ -212,7 +181,7 @@ public class HdfsStateTest {
         /*
          * total tuples should be
          * recovered (batch-1 + batch-2) + replayed (batch-3)
-        */
+         */
         List<String> lines = getLinesFromCurrentDataFile();
         int preReplayCount = batch1Count + batch2Count + batch3Count;
         int expectedTupleCount = batch1Count + batch2Count + batch3ReplayCount;
@@ -220,4 +189,28 @@ public class HdfsStateTest {
         Assert.assertNotEquals(preReplayCount, lines.size());
         Assert.assertEquals(expectedTupleCount, lines.size());
     }
+
+    private static class TestFileNameFormat implements FileNameFormat {
+        private String currentFileName = "";
+
+        @Override
+        public void prepare(Map<String, Object> conf, int partitionIndex, int 
numPartitions) {
+
+        }
+
+        @Override
+        public String getName(long rotation, long timeStamp) {
+            currentFileName = FILE_NAME_PREFIX + Long.toString(rotation);
+            return currentFileName;
+        }
+
+        @Override
+        public String getPath() {
+            return TEST_OUT_DIR;
+        }
+
+        public String getCurrentFileName() {
+            return currentFileName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/format/TestSimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/format/TestSimpleFileNameFormat.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/format/TestSimpleFileNameFormat.java
index 232b4cf..d4840b0 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/format/TestSimpleFileNameFormat.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/format/TestSimpleFileNameFormat.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident.format;
 
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
-
 import org.apache.storm.utils.Utils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,7 +30,7 @@ public class TestSimpleFileNameFormat {
 
         Assert.assertEquals("/storm", path);
         String time = new SimpleDateFormat("yyyyMMddHHmmss").format(now);
-        Assert.assertEquals(time+".1.txt", name);
+        Assert.assertEquals(time + ".1.txt", name);
     }
 
     @Test
@@ -49,7 +43,7 @@ public class TestSimpleFileNameFormat {
         long now = System.currentTimeMillis();
         String path = format.getPath();
         String name = format.getName(1, now);
-    
+
         Assert.assertEquals("/mypath", path);
         String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(now);
         String host = null;
@@ -58,13 +52,13 @@ public class TestSimpleFileNameFormat {
         } catch (UnknownHostException e) {
             e.printStackTrace();
         }
-        Assert.assertEquals(time+"."+host+".3.1.txt", name);
+        Assert.assertEquals(time + "." + host + ".3.1.txt", name);
     }
 
-    @Test(expected=IllegalArgumentException.class)
+    @Test(expected = IllegalArgumentException.class)
     public void testTimeFormat() {
         SimpleFileNameFormat format = new SimpleFileNameFormat()
-               .withTimeFormat("xyz");
+            .withTimeFormat("xyz");
         format.prepare(null, 3, 5);
     }
 }

Reply via email to