http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 2867e6a..2889cd1 100644
--- 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -1,26 +1,16 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.daemon.supervisor;
 
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+package org.apache.storm.daemon.supervisor;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,11 +19,10 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.storm.daemon.supervisor.Slot.StaticState;
-import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
 import org.apache.storm.daemon.supervisor.Slot.DynamicState;
 import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.StaticState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
@@ -53,6 +42,23 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class SlotTest {
     private static final Logger LOG = LoggerFactory.getLogger(SlotTest.class);
 
@@ -61,17 +67,17 @@ public class SlotTest {
         if (cpu != null) {
             resources.set_cpu(cpu);
         }
-        
+
         if (mem_on_heap != null) {
             resources.set_mem_on_heap(mem_on_heap);
         }
-        
+
         if (mem_off_heap != null) {
             resources.set_mem_off_heap(mem_off_heap);
         }
         return resources;
     }
-    
+
     static LSWorkerHeartbeat mkWorkerHB(String id, int port, 
List<ExecutorInfo> exec, Integer timeSecs) {
         LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
         ret.set_topology_id(id);
@@ -80,8 +86,8 @@ public class SlotTest {
         ret.set_time_secs(timeSecs);
         return ret;
     }
-    
-    static List<ExecutorInfo> mkExecutorInfoList(int ... executors) {
+
+    static List<ExecutorInfo> mkExecutorInfoList(int... executors) {
         ArrayList<ExecutorInfo> ret = new ArrayList<>(executors.length);
         for (int exec : executors) {
             ExecutorInfo execInfo = new ExecutorInfo();
@@ -91,7 +97,7 @@ public class SlotTest {
         }
         return ret;
     }
-    
+
     static LocalAssignment mkLocalAssignment(String id, List<ExecutorInfo> 
exec, WorkerResources resources) {
         LocalAssignment ret = new LocalAssignment();
         ret.set_topology_id(id);
@@ -101,19 +107,19 @@ public class SlotTest {
         }
         return ret;
     }
-    
+
     @Test
     public void testEquivilant() {
-        LocalAssignment a = mkLocalAssignment("A", 
mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0));
-        LocalAssignment aResized = mkLocalAssignment("A", 
mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0));
-        LocalAssignment b = mkLocalAssignment("B", 
mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0));
-        LocalAssignment bReordered = mkLocalAssignment("B", 
mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 
4, 5), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment aResized = mkLocalAssignment("A", 
mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0));
+        LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 
4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment bReordered = mkLocalAssignment("B", 
mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0));
 
         assertTrue(Slot.equivalent(null, null));
         assertTrue(Slot.equivalent(a, a));
         assertTrue(Slot.equivalent(b, bReordered));
         assertTrue(Slot.equivalent(bReordered, b));
-        
+
         assertFalse(Slot.equivalent(a, aResized));
         assertFalse(Slot.equivalent(aResized, a));
         assertFalse(Slot.equivalent(a, null));
@@ -123,10 +129,10 @@ public class SlotTest {
 
     @Test
     public void testForSameTopology() {
-        LocalAssignment a = mkLocalAssignment("A", 
mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0));
-        LocalAssignment aResized = mkLocalAssignment("A", 
mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0));
-        LocalAssignment b = mkLocalAssignment("B", 
mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0));
-        LocalAssignment bReordered = mkLocalAssignment("B", 
mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 
4, 5), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment aResized = mkLocalAssignment("A", 
mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0));
+        LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 
4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment bReordered = mkLocalAssignment("B", 
mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0));
 
         assertTrue(Slot.forSameTopology(null, null));
         assertTrue(Slot.forSameTopology(a, a));
@@ -142,29 +148,29 @@ public class SlotTest {
 
     @Test
     public void testEmptyToEmpty() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             LocalState state = mock(LocalState.class);
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
             ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 1000, 1000, 
1000, 1000,
-                    containerLauncher, "localhost", 8080, iSuper, state, cb, 
null, null);
+                                                      containerLauncher, 
"localhost", 8080, iSuper, state, cb, null, null);
             DynamicState dynamicState = new DynamicState(null, null, null);
             DynamicState nextState = Slot.handleEmpty(dynamicState, 
staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             assertTrue(Time.currentTimeMillis() > 1000);
         }
     }
-    
+
     @Test
     public void testLaunchContainerFromEmpty() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             int port = 8080;
             String topoId = "NEW";
-            List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
-            LocalAssignment newAssignment = 
-                    mkLocalAssignment(topoId, execList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            List<ExecutorInfo> execList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment newAssignment =
+                mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 
100.0, 100.0));
 
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
@@ -178,12 +184,12 @@ public class SlotTest {
             @SuppressWarnings("unchecked")
             CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadTopologyBlobs(newAssignment, port, 
cb)).thenReturn(blobFuture);
-            
+
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
+                                                      containerLauncher, 
"localhost", port, iSuper, state, cb, null, null);
             DynamicState dynamicState = new DynamicState(null, null, null)
-                    .withNewAssignment(newAssignment);
+                .withNewAssignment(newAssignment);
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
             verify(localizer).requestDownloadTopologyBlobs(newAssignment, 
port, cb);
@@ -191,7 +197,7 @@ public class SlotTest {
             assertSame("pendingDownload not set properly", blobFuture, 
nextState.pendingDownload);
             assertEquals(newAssignment, nextState.pendingLocalization);
             assertEquals(0, Time.currentTimeMillis());
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             verify(blobFuture).get(1000, TimeUnit.MILLISECONDS);
             verify(containerLauncher).launchContainer(port, newAssignment, 
state);
@@ -201,7 +207,7 @@ public class SlotTest {
             assertSame(newAssignment, nextState.currentAssignment);
             assertSame(container, nextState.container);
             assertEquals(0, Time.currentTimeMillis());
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
@@ -209,7 +215,7 @@ public class SlotTest {
             assertSame(newAssignment, nextState.currentAssignment);
             assertSame(container, nextState.container);
             assertEquals(0, Time.currentTimeMillis());
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
@@ -217,7 +223,7 @@ public class SlotTest {
             assertSame(newAssignment, nextState.currentAssignment);
             assertSame(container, nextState.container);
             assertTrue(Time.currentTimeMillis() > 1000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
@@ -231,59 +237,59 @@ public class SlotTest {
 
     @Test
     public void testRelaunch() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             int port = 8080;
             String topoId = "CURRENT";
-            List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
-            LocalAssignment assignment = 
-                    mkLocalAssignment(topoId, execList, 
mkWorkerResources(100.0, 100.0, 100.0));
-            
+            List<ExecutorInfo> execList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment assignment =
+                mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 
100.0, 100.0));
+
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
             Container container = mock(Container.class);
             ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
-            LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, 
Time.currentTimeSecs()-10);
+            LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, 
Time.currentTimeSecs() - 10);
             LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, 
Time.currentTimeSecs());
             when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, 
goodhb);
             when(container.areAllProcessesDead()).thenReturn(false, true);
-            
+
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
+                                                      containerLauncher, 
"localhost", port, iSuper, state, cb, null, null);
             DynamicState dynamicState = new DynamicState(assignment, 
container, assignment);
-            
+
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
             assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
             verify(container).kill();
             assertTrue(Time.currentTimeMillis() > 1000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
             verify(container).forceKill();
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.WAITING_FOR_WORKER_START, 
nextState.state);
             verify(container).relaunch();
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.WAITING_FOR_WORKER_START, 
nextState.state);
             assertTrue(Time.currentTimeMillis() > 3000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
         }
     }
-    
+
     @Test
     public void testReschedule() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
-            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
-            LocalAssignment cAssignment = 
-                    mkLocalAssignment(cTopoId, cExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment cAssignment =
+                mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 
100.0, 100.0));
 
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
 
@@ -291,12 +297,12 @@ public class SlotTest {
             LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs());
             when(cContainer.readHeartbeat()).thenReturn(chb);
             when(cContainer.areAllProcessesDead()).thenReturn(false, true);
-            
+
             String nTopoId = "NEW";
-            List<ExecutorInfo> nExecList =  mkExecutorInfoList(1,2,3,4,5);
-            LocalAssignment nAssignment = 
-                    mkLocalAssignment(nTopoId, nExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
-            
+            List<ExecutorInfo> nExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment nAssignment =
+                mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 
100.0, 100.0));
+
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container nContainer = mock(Container.class);
             LocalState state = mock(LocalState.class);
@@ -304,16 +310,16 @@ public class SlotTest {
             when(containerLauncher.launchContainer(port, nAssignment, 
state)).thenReturn(nContainer);
             LSWorkerHeartbeat nhb = mkWorkerHB(nTopoId, 100, nExecList, 
Time.currentTimeSecs());
             when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
-            
+
             @SuppressWarnings("unchecked")
             CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadTopologyBlobs(nAssignment, port, 
cb)).thenReturn(blobFuture);
-            
+
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
+                                                      containerLauncher, 
"localhost", port, iSuper, state, cb, null, null);
             DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, nAssignment);
-            
+
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
             assertEquals(MachineState.KILL, nextState.state);
             verify(cContainer).kill();
@@ -321,20 +327,20 @@ public class SlotTest {
             assertSame("pendingDownload not set properly", blobFuture, 
nextState.pendingDownload);
             assertEquals(nAssignment, nextState.pendingLocalization);
             assertTrue(Time.currentTimeMillis() > 1000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.KILL, nextState.state);
             verify(cContainer).forceKill();
             assertSame("pendingDownload not set properly", blobFuture, 
nextState.pendingDownload);
             assertEquals(nAssignment, nextState.pendingLocalization);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, 
nextState.state);
             verify(cContainer).cleanUp();
             verify(localizer).releaseSlotFor(cAssignment, port);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             verify(blobFuture).get(1000, TimeUnit.MILLISECONDS);
             verify(containerLauncher).launchContainer(port, nAssignment, 
state);
@@ -344,7 +350,7 @@ public class SlotTest {
             assertSame(nAssignment, nextState.currentAssignment);
             assertSame(nContainer, nextState.container);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
@@ -352,7 +358,7 @@ public class SlotTest {
             assertSame(nAssignment, nextState.currentAssignment);
             assertSame(nContainer, nextState.container);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
@@ -360,7 +366,7 @@ public class SlotTest {
             assertSame(nAssignment, nextState.currentAssignment);
             assertSame(nContainer, nextState.container);
             assertTrue(Time.currentTimeMillis() > 3000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
@@ -370,31 +376,31 @@ public class SlotTest {
             assertTrue(Time.currentTimeMillis() > 4000);
         }
     }
-    
+
     @Test
     public void testRunningToEmpty() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
-            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
-            LocalAssignment cAssignment = 
-                    mkLocalAssignment(cTopoId, cExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
-            
+            List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment cAssignment =
+                mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 
100.0, 100.0));
+
             Container cContainer = mock(Container.class);
             LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs());
             when(cContainer.readHeartbeat()).thenReturn(chb);
             when(cContainer.areAllProcessesDead()).thenReturn(false, true);
-            
+
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
             ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
-            
+
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
+                                                      containerLauncher, 
"localhost", port, iSuper, state, cb, null, null);
             DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, null);
-            
+
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
             assertEquals(MachineState.KILL, nextState.state);
             verify(cContainer).kill();
@@ -402,14 +408,14 @@ public class SlotTest {
             assertSame("pendingDownload not set properly", null, 
nextState.pendingDownload);
             assertEquals(null, nextState.pendingLocalization);
             assertTrue(Time.currentTimeMillis() > 1000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.KILL, nextState.state);
             verify(cContainer).forceKill();
             assertSame("pendingDownload not set properly", null, 
nextState.pendingDownload);
             assertEquals(null, nextState.pendingLocalization);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             verify(cContainer).cleanUp();
@@ -417,13 +423,13 @@ public class SlotTest {
             assertEquals(null, nextState.container);
             assertEquals(null, nextState.currentAssignment);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             assertEquals(null, nextState.container);
             assertEquals(null, nextState.currentAssignment);
             assertTrue(Time.currentTimeMillis() > 3000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             assertEquals(null, nextState.container);
@@ -431,29 +437,29 @@ public class SlotTest {
             assertTrue(Time.currentTimeMillis() > 3000);
         }
     }
-    
+
     @Test
     public void testRunWithProfileActions() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
-            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
-            LocalAssignment cAssignment = 
-                    mkLocalAssignment(cTopoId, cExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
-            
+            List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment cAssignment =
+                mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 
100.0, 100.0));
+
             Container cContainer = mock(Container.class);
-            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs()+100); //NOT going to timeout for a while
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs() + 100); //NOT going to timeout for a while
             when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, 
chb, chb);
             when(cContainer.runProfiling(any(ProfileRequest.class), 
anyBoolean())).thenReturn(true);
-            
+
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
             ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
-            
+
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
+                                                      containerLauncher, 
"localhost", port, iSuper, state, cb, null, null);
             Set<TopoProfileAction> profileActions = new HashSet<>();
             ProfileRequest request = new ProfileRequest();
             request.set_action(ProfileAction.JPROFILE_STOP);
@@ -462,56 +468,57 @@ public class SlotTest {
             info.add_to_port(port);
             request.set_nodeInfo(info);
             request.set_time_stamp(Time.currentTimeMillis() + 3000);//3 
seconds from now
-            
+
             TopoProfileAction profile = new TopoProfileAction(cTopoId, 
request);
             profileActions.add(profile);
             Set<TopoProfileAction> expectedPending = new HashSet<>();
             expectedPending.add(profile);
-            
-            
-            DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, cAssignment).withProfileActions(profileActions, 
Collections.<TopoProfileAction> emptySet());
-            
+
+
+            DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, cAssignment)
+                .withProfileActions(profileActions, 
Collections.<TopoProfileAction>emptySet());
+
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             verify(cContainer).runProfiling(request, false);
             assertEquals(expectedPending, nextState.pendingStopProfileActions);
             assertEquals(expectedPending, nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 1000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertEquals(expectedPending, nextState.pendingStopProfileActions);
             assertEquals(expectedPending, nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 2000);
-            
-            
+
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertEquals(expectedPending, nextState.pendingStopProfileActions);
             assertEquals(expectedPending, nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 3000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             verify(cContainer).runProfiling(request, true);
-            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.pendingStopProfileActions);
-            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.profileActions);
+            assertEquals(Collections.<TopoProfileAction>emptySet(), 
nextState.pendingStopProfileActions);
+            assertEquals(Collections.<TopoProfileAction>emptySet(), 
nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 4000);
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
-            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.pendingStopProfileActions);
-            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.profileActions);
+            assertEquals(Collections.<TopoProfileAction>emptySet(), 
nextState.pendingStopProfileActions);
+            assertEquals(Collections.<TopoProfileAction>emptySet(), 
nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 5000);
         }
     }
 
     @Test
     public void testResourcesChanged() throws Exception {
-        try (SimulatedTime t = new SimulatedTime(1010)){
+        try (SimulatedTime t = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
-            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
             LocalAssignment cAssignment =
                 mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 
100.0, 100.0));
 
@@ -531,7 +538,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             long heartbeatTimeoutMs = 5000;
             StaticState staticState = new StaticState(localizer, 
heartbeatTimeoutMs, 120_000, 1000, 1000,
-                containerLauncher, "localhost", port, iSuper, state, cb, null, 
null);
+                                                      containerLauncher, 
"localhost", port, iSuper, state, cb, null, null);
 
             Set<Slot.BlobChanging> changing = new HashSet<>();
             LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class);
@@ -541,7 +548,7 @@ public class SlotTest {
             changing.add(new Slot.BlobChanging(cAssignment, stormJar, 
stormJarLatch));
 
             DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, cAssignment).withChangingBlobs(changing);
-            
+
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
             assertEquals(MachineState.KILL_BLOB_UPDATE, nextState.state);
             verify(iSuper).killedWorker(port);
@@ -579,7 +586,7 @@ public class SlotTest {
             assertSame(nContainer, nextState.container);
             assertThat(Time.currentTimeMillis(), greaterThan(2000L));
             assertThat(Time.currentTimeMillis(), lessThan(heartbeatTimeoutMs));
-            
+
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
             assertNull(nextState.pendingChangingBlobsAssignment);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 00a3f98..0e249b6 100644
--- 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -1,40 +1,17 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.localizer;
 
-import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING;
-import static org.apache.storm.localizer.LocalizedResource.USERCACHE;
-import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.endsWith;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.startsWith;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import com.google.common.base.Joiner;
 import java.io.File;
 import java.io.FileInputStream;
@@ -55,41 +32,61 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
 import org.apache.storm.generated.AccessControl;
 import org.apache.storm.generated.AccessControlType;
 import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.security.auth.DefaultPrincipalToLocal;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.generated.ExecutorInfo;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.security.auth.DefaultPrincipalToLocal;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING;
+import static org.apache.storm.localizer.LocalizedResource.USERCACHE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class AsyncLocalizerTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizerTest.class);
+    private final String user1 = "user1";
+    private final String user2 = "user2";
+    private final String user3 = "user3";
+    //From LocalizerTest
+    private File baseDir;
+    private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
 
     private static String getTestLocalizerRoot() {
         File f = new File("./target/" + 
Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/");
@@ -187,25 +184,25 @@ public class AsyncLocalizerTest {
         final int port = 8080;
         final String simpleLocalName = "simple.txt";
         final String simpleKey = "simple";
-        
+
         final String stormLocal = "/tmp/storm-local/";
         final File userDir = new File(stormLocal, user);
-        final String stormRoot = stormLocal+topoId+"/";
-        
+        final String stormRoot = stormLocal + topoId + "/";
+
         final String localizerRoot = getTestLocalizerRoot();
         final String simpleCurrentLocalFile = localizerRoot + "/usercache/" + 
user + "/filecache/files/simple.current";
-       
+
         final StormTopology st = new StormTopology();
         st.set_spouts(new HashMap<>());
         st.set_bolts(new HashMap<>());
         st.set_state_spouts(new HashMap<>());
- 
+
         Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
         Map<String, Object> simple = new HashMap<>();
         simple.put("localname", simpleLocalName);
         simple.put("uncompress", false);
         topoBlobMap.put(simpleKey, simple);
-        
+
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_LOCAL_DIR, stormLocal);
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
@@ -214,7 +211,7 @@ public class AsyncLocalizerTest {
         Map<String, Object> topoConf = new HashMap<>(conf);
         topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
         topoConf.put(Config.TOPOLOGY_NAME, topoName);
-        
+
         List<LocalizedResource> localizedList = new ArrayList<>();
         LocalizedResource simpleLocal = new LocalizedResource(simpleKey, 
Paths.get(localizerRoot), false, ops, conf, user);
         localizedList.add(simpleLocal);
@@ -228,7 +225,7 @@ public class AsyncLocalizerTest {
 
             //Write the mocking backwards so the actual method is not called 
on the spy object
             doReturn(CompletableFuture.supplyAsync(() -> null)).when(bl)
-                .requestDownloadBaseTopologyBlobs(any(), eq(null));
+                                                               
.requestDownloadBaseTopologyBlobs(any(), eq(null));
             doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
             doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), 
any());
 
@@ -254,119 +251,9 @@ public class AsyncLocalizerTest {
         }
     }
 
-
-    //From LocalizerTest
-    private File baseDir;
-
-    private final String user1 = "user1";
-    private final String user2 = "user2";
-    private final String user3 = "user3";
-
-    private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
-
-
-    class TestLocalizer extends AsyncLocalizer {
-
-        TestLocalizer(Map<String, Object> conf, String baseDir) throws 
IOException {
-            super(conf, AdvancedFSOps.make(conf), baseDir);
-        }
-
-        @Override
-        protected ClientBlobStore getClientBlobStore() {
-            return mockblobstore;
-        }
-
-        synchronized void addReferences(List<LocalResource> localresource, 
PortAndAssignment pna, BlobChangingCallback cb) {
-            String user = pna.getOwner();
-            for (LocalResource blob : localresource) {
-                ConcurrentMap<String, LocalizedResource> lrsrcSet = 
blob.shouldUncompress() ? userArchives.get(user) : userFiles.get(user);
-                if (lrsrcSet != null) {
-                    LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName());
-                    if (lrsrc != null) {
-                        lrsrc.addReference(pna, blob.needsCallback() ? cb : 
null);
-                        LOG.debug("added reference for topo: {} key: {}", pna, 
blob);
-                    } else {
-                        LOG.warn("trying to add reference to non-existent 
blob, key: {} topo: {}", blob, pna);
-                    }
-                } else {
-                    LOG.warn("trying to add reference to non-existent local 
resource set, user: {} topo: {}", user, pna);
-                }
-            }
-        }
-
-        void setTargetCacheSize(long size) {
-            cacheTargetSize = size;
-        }
-
-        // For testing, be careful as it doesn't clone
-        ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> getUserFiles() {
-            return userFiles;
-        }
-
-        ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> getUserArchives() {
-            return userArchives;
-        }
-
-        /**
-         * This function either returns the blob in the existing cache or if 
it doesn't exist in the
-         * cache, it will download the blob and will block until the download 
is complete.
-         */
-        LocalizedResource getBlob(LocalResource localResource, 
PortAndAssignment pna, BlobChangingCallback cb)
-            throws AuthorizationException, KeyNotFoundException, IOException {
-            ArrayList<LocalResource> arr = new ArrayList<>();
-            arr.add(localResource);
-            List<LocalizedResource> results = getBlobs(arr, pna, cb);
-            if (results.isEmpty() || results.size() != 1) {
-                throw new IOException("Unknown error getting blob: " + 
localResource + ", for user: " + pna.getOwner() +
-                    ", topo: " + pna);
-            }
-            return results.get(0);
-        }
-    }
-
-    class TestInputStreamWithMeta extends InputStreamWithMeta {
-        private final long version;
-        private InputStream iostream;
-
-        public TestInputStreamWithMeta(long version) {
-            iostream = IOUtils.toInputStream("some test data for my input 
stream");
-            this.version = version;
-        }
-
-        public TestInputStreamWithMeta(InputStream istream, long version) {
-            iostream = istream;
-            this.version = version;
-        }
-
-        @Override
-        public long getVersion() throws IOException {
-            return version;
-        }
-
-        @Override
-        public synchronized int read() {
-            return 0;
-        }
-
-        @Override
-        public synchronized int read(byte[] b)
-            throws IOException {
-            int length = iostream.read(b);
-            if (length == 0) {
-                return -1;
-            }
-            return length;
-        }
-
-        @Override
-        public long getFileLength() {
-            return 0;
-        }
-    };
-
     @Before
     public void setUp() throws Exception {
-        baseDir = new File(System.getProperty("java.io.tmpdir") + 
"/blob-store-localizer-test-"+ UUID.randomUUID());
+        baseDir = new File(System.getProperty("java.io.tmpdir") + 
"/blob-store-localizer-test-" + UUID.randomUUID());
         if (!baseDir.mkdir()) {
             throw new IOException("failed to create base directory");
         }
@@ -376,9 +263,12 @@ public class AsyncLocalizerTest {
     public void tearDown() throws Exception {
         try {
             FileUtils.deleteDirectory(baseDir);
-        } catch (IOException ignore) {}
+        } catch (IOException ignore) {
+        }
     }
 
+    ;
+
     protected String joinPath(String... pathList) {
         return Joiner.on(File.separator).join(pathList);
     }
@@ -402,11 +292,11 @@ public class AsyncLocalizerTest {
 
         String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
         assertEquals("get local user dir doesn't return right value",
-            expectedDir, localizer.getLocalUserDir(user1).toString());
+                     expectedDir, localizer.getLocalUserDir(user1).toString());
 
         String expectedFileDir = joinPath(expectedDir, 
LocalizedResource.FILECACHE);
         assertEquals("get local user file dir doesn't return right value",
-            expectedFileDir, 
localizer.getLocalUserFileCacheDir(user1).toString());
+                     expectedFileDir, 
localizer.getLocalUserFileCacheDir(user1).toString());
     }
 
     @Test
@@ -545,7 +435,8 @@ public class AsyncLocalizerTest {
             when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
 
             when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta(new
-                FileInputStream(archiveFile.getAbsolutePath()), 0));
+                                                                               
          FileInputStream(archiveFile.getAbsolutePath()),
+                                                                               
      0));
 
             long timeBefore = Time.currentTimeMillis();
             Time.advanceTime(10);
@@ -583,7 +474,7 @@ public class AsyncLocalizerTest {
             assertEquals("file path doesn't match", keyFile.toPath(), 
key1rsrc.getFilePathWithVersion());
             assertEquals("size doesn't match", size, key1rsrc.getSizeOnDisk());
             assertTrue("timestamp not within range", (key1rsrc.getLastUsed() 
>= timeBefore && key1rsrc
-                .getLastUsed() <= timeAfter));
+                                                                               
                   .getLastUsed() <= timeAfter));
 
             timeBefore = Time.currentTimeMillis();
             Time.advanceTime(10);
@@ -598,7 +489,7 @@ public class AsyncLocalizerTest {
             assertNotNull("Local resource doesn't exist but should", key1rsrc);
             assertEquals("refcount doesn't match " + 
key1rsrc.getDependencies(), false, key1rsrc.isUsed());
             assertTrue("timestamp not within range", (key1rsrc.getLastUsed() 
>= timeBefore && key1rsrc
-                .getLastUsed() <= timeAfter));
+                                                                               
                   .getLastUsed() <= timeAfter));
 
             // should remove the blob since cache size set really small
             localizer.cleanup();
@@ -660,7 +551,7 @@ public class AsyncLocalizerTest {
             assertEquals("file path doesn't match", keyFile.toPath(), 
key1rsrc.getCurrentSymlinkPath());
             assertEquals("size doesn't match", 34, key1rsrc.getSizeOnDisk());
             assertTrue("timestamp not within range", (key1rsrc.getLastUsed() 
>= timeBefore && key1rsrc
-                .getLastUsed() <= timeAfter));
+                                                                               
                   .getLastUsed() <= timeAfter));
 
             timeBefore = Time.currentTimeMillis();
             Time.advanceTime(10);
@@ -675,7 +566,7 @@ public class AsyncLocalizerTest {
             assertNotNull("Local resource doesn't exist but should", key1rsrc);
             assertEquals("refcount doesn't match " + 
key1rsrc.getDependencies(), false, key1rsrc.isUsed());
             assertTrue("timestamp not within range " + timeBefore + " " + 
key1rsrc.getLastUsed() + " " + timeAfter,
-                (key1rsrc.getLastUsed() >= timeBefore && 
key1rsrc.getLastUsed() <= timeAfter));
+                       (key1rsrc.getLastUsed() >= timeBefore && 
key1rsrc.getLastUsed() <= timeAfter));
 
             // should remove the blob since cache size set really small
             localizer.cleanup();
@@ -710,8 +601,10 @@ public class AsyncLocalizerTest {
             when(mockblobstore.getBlob(key2)).thenReturn(new 
TestInputStreamWithMeta(0));
             when(mockblobstore.getBlob(key3)).thenReturn(new 
TestInputStreamWithMeta(0));
 
-            List<LocalResource> keys = Arrays.asList(new LocalResource[]{new 
LocalResource(key1, false, false),
-                new LocalResource(key2, false, false), new LocalResource(key3, 
false, false)});
+            List<LocalResource> keys = Arrays.asList(new LocalResource[]{
+                new LocalResource(key1, false, false),
+                new LocalResource(key2, false, false), new LocalResource(key3, 
false, false)
+            });
             File user1Dir = localizer.getLocalUserFileCacheDir(user1);
             assertTrue("failed to create user dir", user1Dir.mkdirs());
 
@@ -724,7 +617,7 @@ public class AsyncLocalizerTest {
             LocalizedResource lrsrc3 = lrsrcs.get(2);
 
             String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, 
user1,
-                LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+                                              LocalizedResource.FILECACHE, 
LocalizedResource.FILESDIR);
             assertTrue("user filecache dir not created", new 
File(expectedFileDir).exists());
             File keyFile = new File(expectedFileDir, key1 + 
LocalizedResource.CURRENT_BLOB_SUFFIX);
             File keyFile2 = new File(expectedFileDir, key2 + 
LocalizedResource.CURRENT_BLOB_SUFFIX);
@@ -760,7 +653,7 @@ public class AsyncLocalizerTest {
             lrsrc = localizer.getBlob(new LocalResource(key1, false, false), 
topo1Pna, null);
             LOG.info("Got Blob...");
             assertTrue("timestamp not within range " + timeBefore + " <= " + 
lrsrc.getLastUsed() + " <= " + timeAfter,
-                (lrsrc.getLastUsed() >= timeBefore && lrsrc.getLastUsed() <= 
timeAfter));
+                       (lrsrc.getLastUsed() >= timeBefore && 
lrsrc.getLastUsed() <= timeAfter));
             //Resets the last access time for key1
             localizer.removeBlobReference(lrsrc.getKey(), topo1Pna, false);
 
@@ -829,7 +722,7 @@ public class AsyncLocalizerTest {
         LocalFsBlobStore spy = spy(bs);
         Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
         Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
-        spy.prepare(conf,null,null);
+        spy.prepare(conf, null, null);
         spy.getBlob(key1, null);
     }
 
@@ -837,7 +730,7 @@ public class AsyncLocalizerTest {
     public void testMultipleUsers() throws Exception {
         Map<String, Object> conf = new HashMap<>();
         // set clean time really high so doesn't kick in
-        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60 * 60 * 1000);
 
         String topo1 = "topo1";
         String topo2 = "topo2";
@@ -884,9 +777,9 @@ public class AsyncLocalizerTest {
         String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, 
user1);
         String expectedFileDirUser1 = joinPath(expectedUserDir1, 
LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
         String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, 
user2,
-            LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+                                               LocalizedResource.FILECACHE, 
LocalizedResource.FILESDIR);
         String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, 
user3,
-            LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+                                               LocalizedResource.FILECACHE, 
LocalizedResource.FILESDIR);
         assertTrue("user filecache dir user1 not created", new 
File(expectedFileDirUser1).exists());
         assertTrue("user filecache dir user2 not created", new 
File(expectedFileDirUser2).exists());
         assertTrue("user filecache dir user3 not created", new 
File(expectedFileDirUser3).exists());
@@ -929,7 +822,7 @@ public class AsyncLocalizerTest {
     public void testUpdate() throws Exception {
         Map<String, Object> conf = new HashMap<>();
         // set clean time really high so doesn't kick in
-        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60 * 60 * 1000);
 
         String key1 = "key1";
         String topo1 = "topo1";
@@ -983,6 +876,105 @@ public class AsyncLocalizerTest {
         localizer.updateBlobs();
         assertTrue("blob version file not created", versionFile.exists());
         assertEquals("blob version not correct", 3, 
LocalizedResource.localVersionOfBlob(keyVersionFile));
-        assertTrue("blob file with version 3 not created",  new 
File(expectedFileDir, key1 + ".3").exists());
+        assertTrue("blob file with version 3 not created", new 
File(expectedFileDir, key1 + ".3").exists());
+    }
+
+    class TestLocalizer extends AsyncLocalizer {
+
+        TestLocalizer(Map<String, Object> conf, String baseDir) throws 
IOException {
+            super(conf, AdvancedFSOps.make(conf), baseDir);
+        }
+
+        @Override
+        protected ClientBlobStore getClientBlobStore() {
+            return mockblobstore;
+        }
+
+        synchronized void addReferences(List<LocalResource> localresource, 
PortAndAssignment pna, BlobChangingCallback cb) {
+            String user = pna.getOwner();
+            for (LocalResource blob : localresource) {
+                ConcurrentMap<String, LocalizedResource> lrsrcSet = 
blob.shouldUncompress() ? userArchives.get(user) : userFiles.get(user);
+                if (lrsrcSet != null) {
+                    LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName());
+                    if (lrsrc != null) {
+                        lrsrc.addReference(pna, blob.needsCallback() ? cb : 
null);
+                        LOG.debug("added reference for topo: {} key: {}", pna, 
blob);
+                    } else {
+                        LOG.warn("trying to add reference to non-existent 
blob, key: {} topo: {}", blob, pna);
+                    }
+                } else {
+                    LOG.warn("trying to add reference to non-existent local 
resource set, user: {} topo: {}", user, pna);
+                }
+            }
+        }
+
+        void setTargetCacheSize(long size) {
+            cacheTargetSize = size;
+        }
+
+        // For testing, be careful as it doesn't clone
+        ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> getUserFiles() {
+            return userFiles;
+        }
+
+        ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> getUserArchives() {
+            return userArchives;
+        }
+
+        /**
+         * This function either returns the blob in the existing cache or if 
it doesn't exist in the
+         * cache, it will download the blob and will block until the download 
is complete.
+         */
+        LocalizedResource getBlob(LocalResource localResource, 
PortAndAssignment pna, BlobChangingCallback cb)
+            throws AuthorizationException, KeyNotFoundException, IOException {
+            ArrayList<LocalResource> arr = new ArrayList<>();
+            arr.add(localResource);
+            List<LocalizedResource> results = getBlobs(arr, pna, cb);
+            if (results.isEmpty() || results.size() != 1) {
+                throw new IOException("Unknown error getting blob: " + 
localResource + ", for user: " + pna.getOwner() +
+                                      ", topo: " + pna);
+            }
+            return results.get(0);
+        }
+    }
+
+    class TestInputStreamWithMeta extends InputStreamWithMeta {
+        private final long version;
+        private InputStream iostream;
+
+        public TestInputStreamWithMeta(long version) {
+            iostream = IOUtils.toInputStream("some test data for my input 
stream");
+            this.version = version;
+        }
+
+        public TestInputStreamWithMeta(InputStream istream, long version) {
+            iostream = istream;
+            this.version = version;
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return version;
+        }
+
+        @Override
+        public synchronized int read() {
+            return 0;
+        }
+
+        @Override
+        public synchronized int read(byte[] b)
+            throws IOException {
+            int length = iostream.read(b);
+            if (length == 0) {
+                return -1;
+            }
+            return length;
+        }
+
+        @Override
+        public long getFileLength() {
+            return 0;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
 
b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
index 4ebf5cf..a6e8bfd 100644
--- 
a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
 
@@ -33,7 +28,9 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class LocalizedResourceRetentionSetTest {
 
@@ -76,7 +73,7 @@ public class LocalizedResourceRetentionSetTest {
     @Test
     public void testCleanup() throws Exception {
         ClientBlobStore mockBlobstore = mock(ClientBlobStore.class);
-        when (mockBlobstore.getBlobMeta(any())).thenReturn(new 
ReadableBlobMeta(new SettableBlobMeta(), 1));
+        when(mockBlobstore.getBlobMeta(any())).thenReturn(new 
ReadableBlobMeta(new SettableBlobMeta(), 1));
         PortAndAssignment pna1 = new PortAndAssignment(1, new 
LocalAssignment("topo1", Collections.emptyList()));
         String user = "user";
         Map<String, Object> conf = new HashMap<>();
@@ -86,15 +83,15 @@ public class LocalizedResourceRetentionSetTest {
         ConcurrentMap<String, LocalizedResource> lrArchives = new 
ConcurrentHashMap<>();
         // no reference to key1
         LocalizedResource localresource1 = new LocalizedResource("key1", 
Paths.get("./target/TESTING/testfile1"), false, ops, conf,
-            user);
+                                                                 user);
         localresource1.setSize(10);
         // no reference to archive1
         LocalizedResource archiveresource1 = new LocalizedResource("archive1", 
Paths.get("./target/TESTING/testarchive1"), true, ops,
-            conf, user);
+                                                                   conf, user);
         archiveresource1.setSize(20);
         // reference to key2
         LocalizedResource localresource2 = new LocalizedResource("key2", 
Paths.get("./target/TESTING/testfile2"), false, ops, conf,
-            user);
+                                                                 user);
         localresource2.addReference(pna1, null);
         // check adding reference to local resource with topology of same name
         localresource2.addReference(pna1, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java
 
b/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java
index 0d7b520..dcb6a03 100644
--- 
a/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java
@@ -1,32 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.metric;
 
+import java.util.Collection;
+import java.util.Collections;
 import org.apache.storm.metric.api.DataPoint;
 import org.apache.storm.metric.api.IClusterMetricsConsumer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collection;
-import java.util.Collections;
-
 import static org.mockito.Mockito.mock;
 
 public class ClusterMetricsConsumerExecutorTest {
@@ -39,7 +32,7 @@ public class ClusterMetricsConsumerExecutorTest {
     @Test
     public void 
testPrepareDoesNotThrowExceptionWhenInitializingClusterMetricsConsumerIsFailing()
 throws Exception {
         ClusterMetricsConsumerExecutor sut = new 
ClusterMetricsConsumerExecutor(
-                MockFailingClusterMetricsConsumer.class.getName(), 2);
+            MockFailingClusterMetricsConsumer.class.getName(), 2);
 
         // it shouldn't propagate any exceptions
         sut.prepare();
@@ -51,7 +44,7 @@ public class ClusterMetricsConsumerExecutorTest {
     @Test
     public void 
testHandleDataPointsWithClusterMetricsShouldSkipHandlingMetricsIfFailedBefore() 
throws Exception {
         ClusterMetricsConsumerExecutor sut = new 
ClusterMetricsConsumerExecutor(
-                MockFailingClusterMetricsConsumer.class.getName(), 2);
+            MockFailingClusterMetricsConsumer.class.getName(), 2);
 
         // below calls shouldn't propagate any exceptions
         sut.prepare();
@@ -66,7 +59,7 @@ public class ClusterMetricsConsumerExecutorTest {
     @Test
     public void 
testHandleDataPointsWithSupervisorMetricsShouldRetryInitializingClusterMetricsConsumerIfFailedBefore()
 throws Exception {
         ClusterMetricsConsumerExecutor sut = new 
ClusterMetricsConsumerExecutor(
-                MockFailingClusterMetricsConsumer.class.getName(), 2);
+            MockFailingClusterMetricsConsumer.class.getName(), 2);
 
         // below calls shouldn't propagate any exceptions
         sut.prepare();
@@ -84,28 +77,6 @@ public class ClusterMetricsConsumerExecutorTest {
         private static int handleDataPointsWithSupervisorInfoCallCount = 0;
         private static int cleanupCallCount = 0;
 
-        @Override
-        public void prepare(Object registrationArgument) {
-            prepareCallCount++;
-
-            throw new RuntimeException("prepare failing...");
-        }
-
-        @Override
-        public void handleDataPoints(ClusterInfo clusterInfo, 
Collection<DataPoint> dataPoints) {
-            handleDataPointsWithClusterInfoCallCount++;
-        }
-
-        @Override
-        public void handleDataPoints(SupervisorInfo supervisorInfo, 
Collection<DataPoint> dataPoints) {
-            handleDataPointsWithSupervisorInfoCallCount++;
-        }
-
-        @Override
-        public void cleanup() {
-            cleanupCallCount++;
-        }
-
         public static int getPrepareCallCount() {
             return prepareCallCount;
         }
@@ -128,6 +99,28 @@ public class ClusterMetricsConsumerExecutorTest {
             handleDataPointsWithSupervisorInfoCallCount = 0;
             cleanupCallCount = 0;
         }
+
+        @Override
+        public void prepare(Object registrationArgument) {
+            prepareCallCount++;
+
+            throw new RuntimeException("prepare failing...");
+        }
+
+        @Override
+        public void handleDataPoints(ClusterInfo clusterInfo, 
Collection<DataPoint> dataPoints) {
+            handleDataPointsWithClusterInfoCallCount++;
+        }
+
+        @Override
+        public void handleDataPoints(SupervisorInfo supervisorInfo, 
Collection<DataPoint> dataPoints) {
+            handleDataPointsWithSupervisorInfoCallCount++;
+        }
+
+        @Override
+        public void cleanup() {
+            cleanupCallCount++;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java 
b/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java
index 64306a2..916fff0 100644
--- a/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java
+++ b/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.storm.nimbus;
 
 import java.util.ArrayList;
@@ -42,6 +43,19 @@ import org.junit.Test;
  */
 public class LocalNimbusTest {
 
+    public static StormTopology createTestTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("words", new TestWordSpout(), 
generateParallelismHint());
+        builder.setBolt("count", new TestWordCounter(), 
generateParallelismHint()).shuffleGrouping("words");
+        builder.setBolt("globalCount", new TestGlobalCount(), 
generateParallelismHint()).shuffleGrouping("count");
+
+        return builder.createTopology();
+    }
+
+    private static int generateParallelismHint() {
+        return new Random().nextInt(9) + 1;
+    }
+
     @Test
     public void testSubmitTopologyToLocalNimbus() throws Exception {
         int port = Utils.getAvailablePort();
@@ -68,19 +82,6 @@ public class LocalNimbusTest {
         }
     }
 
-    public static StormTopology createTestTopology() {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("words", new TestWordSpout(), 
generateParallelismHint());
-        builder.setBolt("count", new TestWordCounter(), 
generateParallelismHint()).shuffleGrouping("words");
-        builder.setBolt("globalCount", new TestGlobalCount(), 
generateParallelismHint()).shuffleGrouping("count");
-
-        return builder.createTopology();
-    }
-
-    private static int generateParallelismHint() {
-        return new Random().nextInt(9)+1;
-    }
-
     public static class InmemoryTopologySubmitterHook implements 
ISubmitterHook {
         public static final List<TopologyDetails> submittedTopologies = new 
ArrayList<>();
 
@@ -106,8 +107,9 @@ public class LocalNimbusTest {
 
             TopologyDetails that = (TopologyDetails) o;
 
-            if (topologyName != null ? !topologyName.equals(that.topologyName) 
: that.topologyName != null)
+            if (topologyName != null ? !topologyName.equals(that.topologyName) 
: that.topologyName != null) {
                 return false;
+            }
             return !(stormTopology != null ? 
!stormTopology.equals(that.stormTopology) : that.stormTopology != null);
 
         }
@@ -122,9 +124,9 @@ public class LocalNimbusTest {
         @Override
         public String toString() {
             return "TopologyDetails{" +
-                    "topologyName='" + topologyName + '\'' +
-                    ", stormTopology=" + stormTopology +
-                    '}';
+                   "topologyName='" + topologyName + '\'' +
+                   ", stormTopology=" + stormTopology +
+                   '}';
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java 
b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
index b568b04..3b9eb14 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.junit.Assert;
 import org.junit.Test;
@@ -99,8 +93,8 @@ public class ClusterTest {
 
     @Test
     public void getAssignedMemoryForSlot_topologyWorkerLwChildopts() {
-        singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m", 
-                TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0);
+        singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m",
+                        TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0);
     }
 
     @Test

Reply via email to