http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java index 2867e6a..2889cd1 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java @@ -1,26 +1,16 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.daemon.supervisor; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +package org.apache.storm.daemon.supervisor; import java.util.ArrayList; import java.util.Collections; @@ -29,11 +19,10 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - -import org.apache.storm.daemon.supervisor.Slot.StaticState; -import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction; import org.apache.storm.daemon.supervisor.Slot.DynamicState; import org.apache.storm.daemon.supervisor.Slot.MachineState; +import org.apache.storm.daemon.supervisor.Slot.StaticState; +import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction; import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.LocalAssignment; @@ -53,6 +42,23 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class SlotTest { private static final Logger LOG = LoggerFactory.getLogger(SlotTest.class); @@ -61,17 +67,17 @@ public class SlotTest { if (cpu != null) { resources.set_cpu(cpu); } - + if (mem_on_heap != null) { resources.set_mem_on_heap(mem_on_heap); } - + if (mem_off_heap != null) { resources.set_mem_off_heap(mem_off_heap); } return resources; } - + static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) { LSWorkerHeartbeat ret = new LSWorkerHeartbeat(); ret.set_topology_id(id); @@ -80,8 +86,8 @@ public class SlotTest { ret.set_time_secs(timeSecs); return ret; } - - static List<ExecutorInfo> mkExecutorInfoList(int ... executors) { + + static List<ExecutorInfo> mkExecutorInfoList(int... executors) { ArrayList<ExecutorInfo> ret = new ArrayList<>(executors.length); for (int exec : executors) { ExecutorInfo execInfo = new ExecutorInfo(); @@ -91,7 +97,7 @@ public class SlotTest { } return ret; } - + static LocalAssignment mkLocalAssignment(String id, List<ExecutorInfo> exec, WorkerResources resources) { LocalAssignment ret = new LocalAssignment(); ret.set_topology_id(id); @@ -101,19 +107,19 @@ public class SlotTest { } return ret; } - + @Test public void testEquivilant() { - LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0)); - LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0)); - LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0)); - LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0)); + LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0)); assertTrue(Slot.equivalent(null, null)); assertTrue(Slot.equivalent(a, a)); assertTrue(Slot.equivalent(b, bReordered)); assertTrue(Slot.equivalent(bReordered, b)); - + assertFalse(Slot.equivalent(a, aResized)); assertFalse(Slot.equivalent(aResized, a)); assertFalse(Slot.equivalent(a, null)); @@ -123,10 +129,10 @@ public class SlotTest { @Test public void testForSameTopology() { - LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0)); - LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0)); - LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0)); - LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0)); + LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0)); assertTrue(Slot.forSameTopology(null, null)); assertTrue(Slot.forSameTopology(a, a)); @@ -142,29 +148,29 @@ public class SlotTest { @Test public void testEmptyToEmpty() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { AsyncLocalizer localizer = mock(AsyncLocalizer.class); LocalState state = mock(LocalState.class); BlobChangingCallback cb = mock(BlobChangingCallback.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000, - containerLauncher, "localhost", 8080, iSuper, state, cb, null, null); + containerLauncher, "localhost", 8080, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(null, null, null); DynamicState nextState = Slot.handleEmpty(dynamicState, staticState); assertEquals(MachineState.EMPTY, nextState.state); assertTrue(Time.currentTimeMillis() > 1000); } } - + @Test public void testLaunchContainerFromEmpty() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { int port = 8080; String topoId = "NEW"; - List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5); - LocalAssignment newAssignment = - mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); + List<ExecutorInfo> execList = mkExecutorInfoList(1, 2, 3, 4, 5); + LocalAssignment newAssignment = + mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); AsyncLocalizer localizer = mock(AsyncLocalizer.class); BlobChangingCallback cb = mock(BlobChangingCallback.class); @@ -178,12 +184,12 @@ public class SlotTest { @SuppressWarnings("unchecked") CompletableFuture<Void> blobFuture = mock(CompletableFuture.class); when(localizer.requestDownloadTopologyBlobs(newAssignment, port, cb)).thenReturn(blobFuture); - + ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(null, null, null) - .withNewAssignment(newAssignment); + .withNewAssignment(newAssignment); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb); @@ -191,7 +197,7 @@ public class SlotTest { assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload); assertEquals(newAssignment, nextState.pendingLocalization); assertEquals(0, Time.currentTimeMillis()); - + nextState = Slot.stateMachineStep(nextState, staticState); verify(blobFuture).get(1000, TimeUnit.MILLISECONDS); verify(containerLauncher).launchContainer(port, newAssignment, state); @@ -201,7 +207,7 @@ public class SlotTest { assertSame(newAssignment, nextState.currentAssignment); assertSame(container, nextState.container); assertEquals(0, Time.currentTimeMillis()); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertSame("pendingDownload is not null", null, nextState.pendingDownload); @@ -209,7 +215,7 @@ public class SlotTest { assertSame(newAssignment, nextState.currentAssignment); assertSame(container, nextState.container); assertEquals(0, Time.currentTimeMillis()); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertSame("pendingDownload is not null", null, nextState.pendingDownload); @@ -217,7 +223,7 @@ public class SlotTest { assertSame(newAssignment, nextState.currentAssignment); assertSame(container, nextState.container); assertTrue(Time.currentTimeMillis() > 1000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertSame("pendingDownload is not null", null, nextState.pendingDownload); @@ -231,59 +237,59 @@ public class SlotTest { @Test public void testRelaunch() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { int port = 8080; String topoId = "CURRENT"; - List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5); - LocalAssignment assignment = - mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); - + List<ExecutorInfo> execList = mkExecutorInfoList(1, 2, 3, 4, 5); + LocalAssignment assignment = + mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); BlobChangingCallback cb = mock(BlobChangingCallback.class); Container container = mock(Container.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); - LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10); + LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs() - 10); LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()); when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, goodhb); when(container.areAllProcessesDead()).thenReturn(false, true); - + ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(assignment, container, assignment); - + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state); verify(container).kill(); assertTrue(Time.currentTimeMillis() > 1000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state); verify(container).forceKill(); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state); verify(container).relaunch(); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state); assertTrue(Time.currentTimeMillis() > 3000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); } } - + @Test public void testReschedule() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; - List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); - LocalAssignment cAssignment = - mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); + List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5); + LocalAssignment cAssignment = + mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); BlobChangingCallback cb = mock(BlobChangingCallback.class); @@ -291,12 +297,12 @@ public class SlotTest { LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()); when(cContainer.readHeartbeat()).thenReturn(chb); when(cContainer.areAllProcessesDead()).thenReturn(false, true); - + String nTopoId = "NEW"; - List<ExecutorInfo> nExecList = mkExecutorInfoList(1,2,3,4,5); - LocalAssignment nAssignment = - mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0)); - + List<ExecutorInfo> nExecList = mkExecutorInfoList(1, 2, 3, 4, 5); + LocalAssignment nAssignment = + mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0)); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); Container nContainer = mock(Container.class); LocalState state = mock(LocalState.class); @@ -304,16 +310,16 @@ public class SlotTest { when(containerLauncher.launchContainer(port, nAssignment, state)).thenReturn(nContainer); LSWorkerHeartbeat nhb = mkWorkerHB(nTopoId, 100, nExecList, Time.currentTimeSecs()); when(nContainer.readHeartbeat()).thenReturn(nhb, nhb); - + @SuppressWarnings("unchecked") CompletableFuture<Void> blobFuture = mock(CompletableFuture.class); when(localizer.requestDownloadTopologyBlobs(nAssignment, port, cb)).thenReturn(blobFuture); - + ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment); - + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); assertEquals(MachineState.KILL, nextState.state); verify(cContainer).kill(); @@ -321,20 +327,20 @@ public class SlotTest { assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload); assertEquals(nAssignment, nextState.pendingLocalization); assertTrue(Time.currentTimeMillis() > 1000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.KILL, nextState.state); verify(cContainer).forceKill(); assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload); assertEquals(nAssignment, nextState.pendingLocalization); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state); verify(cContainer).cleanUp(); verify(localizer).releaseSlotFor(cAssignment, port); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); verify(blobFuture).get(1000, TimeUnit.MILLISECONDS); verify(containerLauncher).launchContainer(port, nAssignment, state); @@ -344,7 +350,7 @@ public class SlotTest { assertSame(nAssignment, nextState.currentAssignment); assertSame(nContainer, nextState.container); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertSame("pendingDownload is not null", null, nextState.pendingDownload); @@ -352,7 +358,7 @@ public class SlotTest { assertSame(nAssignment, nextState.currentAssignment); assertSame(nContainer, nextState.container); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertSame("pendingDownload is not null", null, nextState.pendingDownload); @@ -360,7 +366,7 @@ public class SlotTest { assertSame(nAssignment, nextState.currentAssignment); assertSame(nContainer, nextState.container); assertTrue(Time.currentTimeMillis() > 3000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertSame("pendingDownload is not null", null, nextState.pendingDownload); @@ -370,31 +376,31 @@ public class SlotTest { assertTrue(Time.currentTimeMillis() > 4000); } } - + @Test public void testRunningToEmpty() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; - List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); - LocalAssignment cAssignment = - mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); - + List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5); + LocalAssignment cAssignment = + mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); + Container cContainer = mock(Container.class); LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()); when(cContainer.readHeartbeat()).thenReturn(chb); when(cContainer.areAllProcessesDead()).thenReturn(false, true); - + AsyncLocalizer localizer = mock(AsyncLocalizer.class); BlobChangingCallback cb = mock(BlobChangingCallback.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); - + ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null); - + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); assertEquals(MachineState.KILL, nextState.state); verify(cContainer).kill(); @@ -402,14 +408,14 @@ public class SlotTest { assertSame("pendingDownload not set properly", null, nextState.pendingDownload); assertEquals(null, nextState.pendingLocalization); assertTrue(Time.currentTimeMillis() > 1000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.KILL, nextState.state); verify(cContainer).forceKill(); assertSame("pendingDownload not set properly", null, nextState.pendingDownload); assertEquals(null, nextState.pendingLocalization); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.EMPTY, nextState.state); verify(cContainer).cleanUp(); @@ -417,13 +423,13 @@ public class SlotTest { assertEquals(null, nextState.container); assertEquals(null, nextState.currentAssignment); assertTrue(Time.currentTimeMillis() > 2000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.EMPTY, nextState.state); assertEquals(null, nextState.container); assertEquals(null, nextState.currentAssignment); assertTrue(Time.currentTimeMillis() > 3000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.EMPTY, nextState.state); assertEquals(null, nextState.container); @@ -431,29 +437,29 @@ public class SlotTest { assertTrue(Time.currentTimeMillis() > 3000); } } - + @Test public void testRunWithProfileActions() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; - List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); - LocalAssignment cAssignment = - mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); - + List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5); + LocalAssignment cAssignment = + mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); + Container cContainer = mock(Container.class); - LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()+100); //NOT going to timeout for a while + LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs() + 100); //NOT going to timeout for a while when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb); when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true); - + AsyncLocalizer localizer = mock(AsyncLocalizer.class); BlobChangingCallback cb = mock(BlobChangingCallback.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); - + ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); Set<TopoProfileAction> profileActions = new HashSet<>(); ProfileRequest request = new ProfileRequest(); request.set_action(ProfileAction.JPROFILE_STOP); @@ -462,56 +468,57 @@ public class SlotTest { info.add_to_port(port); request.set_nodeInfo(info); request.set_time_stamp(Time.currentTimeMillis() + 3000);//3 seconds from now - + TopoProfileAction profile = new TopoProfileAction(cTopoId, request); profileActions.add(profile); Set<TopoProfileAction> expectedPending = new HashSet<>(); expectedPending.add(profile); - - - DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withProfileActions(profileActions, Collections.<TopoProfileAction> emptySet()); - + + + DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment) + .withProfileActions(profileActions, Collections.<TopoProfileAction>emptySet()); + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); assertEquals(MachineState.RUNNING, nextState.state); verify(cContainer).runProfiling(request, false); assertEquals(expectedPending, nextState.pendingStopProfileActions); assertEquals(expectedPending, nextState.profileActions); assertTrue(Time.currentTimeMillis() > 1000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertEquals(expectedPending, nextState.pendingStopProfileActions); assertEquals(expectedPending, nextState.profileActions); assertTrue(Time.currentTimeMillis() > 2000); - - + + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertEquals(expectedPending, nextState.pendingStopProfileActions); assertEquals(expectedPending, nextState.profileActions); assertTrue(Time.currentTimeMillis() > 3000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); verify(cContainer).runProfiling(request, true); - assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions); - assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions); + assertEquals(Collections.<TopoProfileAction>emptySet(), nextState.pendingStopProfileActions); + assertEquals(Collections.<TopoProfileAction>emptySet(), nextState.profileActions); assertTrue(Time.currentTimeMillis() > 4000); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); - assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions); - assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions); + assertEquals(Collections.<TopoProfileAction>emptySet(), nextState.pendingStopProfileActions); + assertEquals(Collections.<TopoProfileAction>emptySet(), nextState.profileActions); assertTrue(Time.currentTimeMillis() > 5000); } } @Test public void testResourcesChanged() throws Exception { - try (SimulatedTime t = new SimulatedTime(1010)){ + try (SimulatedTime t = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; - List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); + List<ExecutorInfo> cExecList = mkExecutorInfoList(1, 2, 3, 4, 5); LocalAssignment cAssignment = mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); @@ -531,7 +538,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); long heartbeatTimeoutMs = 5000; StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); Set<Slot.BlobChanging> changing = new HashSet<>(); LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class); @@ -541,7 +548,7 @@ public class SlotTest { changing.add(new Slot.BlobChanging(cAssignment, stormJar, stormJarLatch)); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withChangingBlobs(changing); - + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); assertEquals(MachineState.KILL_BLOB_UPDATE, nextState.state); verify(iSuper).killedWorker(port); @@ -579,7 +586,7 @@ public class SlotTest { assertSame(nContainer, nextState.container); assertThat(Time.currentTimeMillis(), greaterThan(2000L)); assertThat(Time.currentTimeMillis(), lessThan(heartbeatTimeoutMs)); - + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertNull(nextState.pendingChangingBlobsAssignment);
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index 00a3f98..0e249b6 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -1,40 +1,17 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.localizer; -import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING; -import static org.apache.storm.localizer.LocalizedResource.USERCACHE; -import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.endsWith; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.startsWith; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.google.common.base.Joiner; import java.io.File; import java.io.FileInputStream; @@ -55,41 +32,61 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.blobstore.BlobStoreAclHandler; +import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.daemon.supervisor.AdvancedFSOps; import org.apache.storm.generated.AccessControl; import org.apache.storm.generated.AccessControlType; import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.ReadableBlobMeta; import org.apache.storm.generated.SettableBlobMeta; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.security.auth.DefaultPrincipalToLocal; import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.junit.After; import org.junit.Before; import org.junit.Test; - -import org.apache.storm.Config; -import org.apache.storm.blobstore.ClientBlobStore; -import org.apache.storm.generated.ExecutorInfo; -import org.apache.storm.generated.LocalAssignment; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.security.auth.DefaultPrincipalToLocal; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING; +import static org.apache.storm.localizer.LocalizedResource.USERCACHE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class AsyncLocalizerTest { private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizerTest.class); + private final String user1 = "user1"; + private final String user2 = "user2"; + private final String user3 = "user3"; + //From LocalizerTest + private File baseDir; + private ClientBlobStore mockblobstore = mock(ClientBlobStore.class); private static String getTestLocalizerRoot() { File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/"); @@ -187,25 +184,25 @@ public class AsyncLocalizerTest { final int port = 8080; final String simpleLocalName = "simple.txt"; final String simpleKey = "simple"; - + final String stormLocal = "/tmp/storm-local/"; final File userDir = new File(stormLocal, user); - final String stormRoot = stormLocal+topoId+"/"; - + final String stormRoot = stormLocal + topoId + "/"; + final String localizerRoot = getTestLocalizerRoot(); final String simpleCurrentLocalFile = localizerRoot + "/usercache/" + user + "/filecache/files/simple.current"; - + final StormTopology st = new StormTopology(); st.set_spouts(new HashMap<>()); st.set_bolts(new HashMap<>()); st.set_state_spouts(new HashMap<>()); - + Map<String, Map<String, Object>> topoBlobMap = new HashMap<>(); Map<String, Object> simple = new HashMap<>(); simple.put("localname", simpleLocalName); simple.put("uncompress", false); topoBlobMap.put(simpleKey, simple); - + Map<String, Object> conf = new HashMap<>(); conf.put(Config.STORM_LOCAL_DIR, stormLocal); AdvancedFSOps ops = mock(AdvancedFSOps.class); @@ -214,7 +211,7 @@ public class AsyncLocalizerTest { Map<String, Object> topoConf = new HashMap<>(conf); topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap); topoConf.put(Config.TOPOLOGY_NAME, topoName); - + List<LocalizedResource> localizedList = new ArrayList<>(); LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user); localizedList.add(simpleLocal); @@ -228,7 +225,7 @@ public class AsyncLocalizerTest { //Write the mocking backwards so the actual method is not called on the spy object doReturn(CompletableFuture.supplyAsync(() -> null)).when(bl) - .requestDownloadBaseTopologyBlobs(any(), eq(null)); + .requestDownloadBaseTopologyBlobs(any(), eq(null)); doReturn(userDir).when(bl).getLocalUserFileCacheDir(user); doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), any()); @@ -254,119 +251,9 @@ public class AsyncLocalizerTest { } } - - //From LocalizerTest - private File baseDir; - - private final String user1 = "user1"; - private final String user2 = "user2"; - private final String user3 = "user3"; - - private ClientBlobStore mockblobstore = mock(ClientBlobStore.class); - - - class TestLocalizer extends AsyncLocalizer { - - TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException { - super(conf, AdvancedFSOps.make(conf), baseDir); - } - - @Override - protected ClientBlobStore getClientBlobStore() { - return mockblobstore; - } - - synchronized void addReferences(List<LocalResource> localresource, PortAndAssignment pna, BlobChangingCallback cb) { - String user = pna.getOwner(); - for (LocalResource blob : localresource) { - ConcurrentMap<String, LocalizedResource> lrsrcSet = blob.shouldUncompress() ? userArchives.get(user) : userFiles.get(user); - if (lrsrcSet != null) { - LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName()); - if (lrsrc != null) { - lrsrc.addReference(pna, blob.needsCallback() ? cb : null); - LOG.debug("added reference for topo: {} key: {}", pna, blob); - } else { - LOG.warn("trying to add reference to non-existent blob, key: {} topo: {}", blob, pna); - } - } else { - LOG.warn("trying to add reference to non-existent local resource set, user: {} topo: {}", user, pna); - } - } - } - - void setTargetCacheSize(long size) { - cacheTargetSize = size; - } - - // For testing, be careful as it doesn't clone - ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> getUserFiles() { - return userFiles; - } - - ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> getUserArchives() { - return userArchives; - } - - /** - * This function either returns the blob in the existing cache or if it doesn't exist in the - * cache, it will download the blob and will block until the download is complete. - */ - LocalizedResource getBlob(LocalResource localResource, PortAndAssignment pna, BlobChangingCallback cb) - throws AuthorizationException, KeyNotFoundException, IOException { - ArrayList<LocalResource> arr = new ArrayList<>(); - arr.add(localResource); - List<LocalizedResource> results = getBlobs(arr, pna, cb); - if (results.isEmpty() || results.size() != 1) { - throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + pna.getOwner() + - ", topo: " + pna); - } - return results.get(0); - } - } - - class TestInputStreamWithMeta extends InputStreamWithMeta { - private final long version; - private InputStream iostream; - - public TestInputStreamWithMeta(long version) { - iostream = IOUtils.toInputStream("some test data for my input stream"); - this.version = version; - } - - public TestInputStreamWithMeta(InputStream istream, long version) { - iostream = istream; - this.version = version; - } - - @Override - public long getVersion() throws IOException { - return version; - } - - @Override - public synchronized int read() { - return 0; - } - - @Override - public synchronized int read(byte[] b) - throws IOException { - int length = iostream.read(b); - if (length == 0) { - return -1; - } - return length; - } - - @Override - public long getFileLength() { - return 0; - } - }; - @Before public void setUp() throws Exception { - baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID()); + baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-" + UUID.randomUUID()); if (!baseDir.mkdir()) { throw new IOException("failed to create base directory"); } @@ -376,9 +263,12 @@ public class AsyncLocalizerTest { public void tearDown() throws Exception { try { FileUtils.deleteDirectory(baseDir); - } catch (IOException ignore) {} + } catch (IOException ignore) { + } } + ; + protected String joinPath(String... pathList) { return Joiner.on(File.separator).join(pathList); } @@ -402,11 +292,11 @@ public class AsyncLocalizerTest { String expectedDir = constructUserCacheDir(baseDir.toString(), user1); assertEquals("get local user dir doesn't return right value", - expectedDir, localizer.getLocalUserDir(user1).toString()); + expectedDir, localizer.getLocalUserDir(user1).toString()); String expectedFileDir = joinPath(expectedDir, LocalizedResource.FILECACHE); assertEquals("get local user file dir doesn't return right value", - expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString()); + expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString()); } @Test @@ -545,7 +435,8 @@ public class AsyncLocalizerTest { when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new - FileInputStream(archiveFile.getAbsolutePath()), 0)); + FileInputStream(archiveFile.getAbsolutePath()), + 0)); long timeBefore = Time.currentTimeMillis(); Time.advanceTime(10); @@ -583,7 +474,7 @@ public class AsyncLocalizerTest { assertEquals("file path doesn't match", keyFile.toPath(), key1rsrc.getFilePathWithVersion()); assertEquals("size doesn't match", size, key1rsrc.getSizeOnDisk()); assertTrue("timestamp not within range", (key1rsrc.getLastUsed() >= timeBefore && key1rsrc - .getLastUsed() <= timeAfter)); + .getLastUsed() <= timeAfter)); timeBefore = Time.currentTimeMillis(); Time.advanceTime(10); @@ -598,7 +489,7 @@ public class AsyncLocalizerTest { assertNotNull("Local resource doesn't exist but should", key1rsrc); assertEquals("refcount doesn't match " + key1rsrc.getDependencies(), false, key1rsrc.isUsed()); assertTrue("timestamp not within range", (key1rsrc.getLastUsed() >= timeBefore && key1rsrc - .getLastUsed() <= timeAfter)); + .getLastUsed() <= timeAfter)); // should remove the blob since cache size set really small localizer.cleanup(); @@ -660,7 +551,7 @@ public class AsyncLocalizerTest { assertEquals("file path doesn't match", keyFile.toPath(), key1rsrc.getCurrentSymlinkPath()); assertEquals("size doesn't match", 34, key1rsrc.getSizeOnDisk()); assertTrue("timestamp not within range", (key1rsrc.getLastUsed() >= timeBefore && key1rsrc - .getLastUsed() <= timeAfter)); + .getLastUsed() <= timeAfter)); timeBefore = Time.currentTimeMillis(); Time.advanceTime(10); @@ -675,7 +566,7 @@ public class AsyncLocalizerTest { assertNotNull("Local resource doesn't exist but should", key1rsrc); assertEquals("refcount doesn't match " + key1rsrc.getDependencies(), false, key1rsrc.isUsed()); assertTrue("timestamp not within range " + timeBefore + " " + key1rsrc.getLastUsed() + " " + timeAfter, - (key1rsrc.getLastUsed() >= timeBefore && key1rsrc.getLastUsed() <= timeAfter)); + (key1rsrc.getLastUsed() >= timeBefore && key1rsrc.getLastUsed() <= timeAfter)); // should remove the blob since cache size set really small localizer.cleanup(); @@ -710,8 +601,10 @@ public class AsyncLocalizerTest { when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(0)); when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(0)); - List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false, false), - new LocalResource(key2, false, false), new LocalResource(key3, false, false)}); + List<LocalResource> keys = Arrays.asList(new LocalResource[]{ + new LocalResource(key1, false, false), + new LocalResource(key2, false, false), new LocalResource(key3, false, false) + }); File user1Dir = localizer.getLocalUserFileCacheDir(user1); assertTrue("failed to create user dir", user1Dir.mkdirs()); @@ -724,7 +617,7 @@ public class AsyncLocalizerTest { LocalizedResource lrsrc3 = lrsrcs.get(2); String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1, - LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); + LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); File keyFile = new File(expectedFileDir, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX); File keyFile2 = new File(expectedFileDir, key2 + LocalizedResource.CURRENT_BLOB_SUFFIX); @@ -760,7 +653,7 @@ public class AsyncLocalizerTest { lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null); LOG.info("Got Blob..."); assertTrue("timestamp not within range " + timeBefore + " <= " + lrsrc.getLastUsed() + " <= " + timeAfter, - (lrsrc.getLastUsed() >= timeBefore && lrsrc.getLastUsed() <= timeAfter)); + (lrsrc.getLastUsed() >= timeBefore && lrsrc.getLastUsed() <= timeAfter)); //Resets the last access time for key1 localizer.removeBlobReference(lrsrc.getKey(), topo1Pna, false); @@ -829,7 +722,7 @@ public class AsyncLocalizerTest { LocalFsBlobStore spy = spy(bs); Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1); Mockito.doNothing().when(spy).checkForBlobUpdate(key1); - spy.prepare(conf,null,null); + spy.prepare(conf, null, null); spy.getBlob(key1, null); } @@ -837,7 +730,7 @@ public class AsyncLocalizerTest { public void testMultipleUsers() throws Exception { Map<String, Object> conf = new HashMap<>(); // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000); String topo1 = "topo1"; String topo2 = "topo2"; @@ -884,9 +777,9 @@ public class AsyncLocalizerTest { String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1); String expectedFileDirUser1 = joinPath(expectedUserDir1, LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2, - LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); + LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3, - LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); + LocalizedResource.FILECACHE, LocalizedResource.FILESDIR); assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists()); assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists()); assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists()); @@ -929,7 +822,7 @@ public class AsyncLocalizerTest { public void testUpdate() throws Exception { Map<String, Object> conf = new HashMap<>(); // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000); String key1 = "key1"; String topo1 = "topo1"; @@ -983,6 +876,105 @@ public class AsyncLocalizerTest { localizer.updateBlobs(); assertTrue("blob version file not created", versionFile.exists()); assertEquals("blob version not correct", 3, LocalizedResource.localVersionOfBlob(keyVersionFile)); - assertTrue("blob file with version 3 not created", new File(expectedFileDir, key1 + ".3").exists()); + assertTrue("blob file with version 3 not created", new File(expectedFileDir, key1 + ".3").exists()); + } + + class TestLocalizer extends AsyncLocalizer { + + TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException { + super(conf, AdvancedFSOps.make(conf), baseDir); + } + + @Override + protected ClientBlobStore getClientBlobStore() { + return mockblobstore; + } + + synchronized void addReferences(List<LocalResource> localresource, PortAndAssignment pna, BlobChangingCallback cb) { + String user = pna.getOwner(); + for (LocalResource blob : localresource) { + ConcurrentMap<String, LocalizedResource> lrsrcSet = blob.shouldUncompress() ? userArchives.get(user) : userFiles.get(user); + if (lrsrcSet != null) { + LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName()); + if (lrsrc != null) { + lrsrc.addReference(pna, blob.needsCallback() ? cb : null); + LOG.debug("added reference for topo: {} key: {}", pna, blob); + } else { + LOG.warn("trying to add reference to non-existent blob, key: {} topo: {}", blob, pna); + } + } else { + LOG.warn("trying to add reference to non-existent local resource set, user: {} topo: {}", user, pna); + } + } + } + + void setTargetCacheSize(long size) { + cacheTargetSize = size; + } + + // For testing, be careful as it doesn't clone + ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> getUserFiles() { + return userFiles; + } + + ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> getUserArchives() { + return userArchives; + } + + /** + * This function either returns the blob in the existing cache or if it doesn't exist in the + * cache, it will download the blob and will block until the download is complete. + */ + LocalizedResource getBlob(LocalResource localResource, PortAndAssignment pna, BlobChangingCallback cb) + throws AuthorizationException, KeyNotFoundException, IOException { + ArrayList<LocalResource> arr = new ArrayList<>(); + arr.add(localResource); + List<LocalizedResource> results = getBlobs(arr, pna, cb); + if (results.isEmpty() || results.size() != 1) { + throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + pna.getOwner() + + ", topo: " + pna); + } + return results.get(0); + } + } + + class TestInputStreamWithMeta extends InputStreamWithMeta { + private final long version; + private InputStream iostream; + + public TestInputStreamWithMeta(long version) { + iostream = IOUtils.toInputStream("some test data for my input stream"); + this.version = version; + } + + public TestInputStreamWithMeta(InputStream istream, long version) { + iostream = istream; + this.version = version; + } + + @Override + public long getVersion() throws IOException { + return version; + } + + @Override + public synchronized int read() { + return 0; + } + + @Override + public synchronized int read(byte[] b) + throws IOException { + int length = iostream.read(b); + if (length == 0) { + return -1; + } + return length; + } + + @Override + public long getFileLength() { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java index 4ebf5cf..a6e8bfd 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.localizer; @@ -33,7 +28,9 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class LocalizedResourceRetentionSetTest { @@ -76,7 +73,7 @@ public class LocalizedResourceRetentionSetTest { @Test public void testCleanup() throws Exception { ClientBlobStore mockBlobstore = mock(ClientBlobStore.class); - when (mockBlobstore.getBlobMeta(any())).thenReturn(new ReadableBlobMeta(new SettableBlobMeta(), 1)); + when(mockBlobstore.getBlobMeta(any())).thenReturn(new ReadableBlobMeta(new SettableBlobMeta(), 1)); PortAndAssignment pna1 = new PortAndAssignment(1, new LocalAssignment("topo1", Collections.emptyList())); String user = "user"; Map<String, Object> conf = new HashMap<>(); @@ -86,15 +83,15 @@ public class LocalizedResourceRetentionSetTest { ConcurrentMap<String, LocalizedResource> lrArchives = new ConcurrentHashMap<>(); // no reference to key1 LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("./target/TESTING/testfile1"), false, ops, conf, - user); + user); localresource1.setSize(10); // no reference to archive1 LocalizedResource archiveresource1 = new LocalizedResource("archive1", Paths.get("./target/TESTING/testarchive1"), true, ops, - conf, user); + conf, user); archiveresource1.setSize(20); // reference to key2 LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("./target/TESTING/testfile2"), false, ops, conf, - user); + user); localresource2.addReference(pna1, null); // check adding reference to local resource with topology of same name localresource2.addReference(pna1, null); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java b/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java index 0d7b520..dcb6a03 100644 --- a/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java +++ b/storm-server/src/test/java/org/apache/storm/metric/ClusterMetricsConsumerExecutorTest.java @@ -1,32 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metric; +import java.util.Collection; +import java.util.Collections; import org.apache.storm.metric.api.DataPoint; import org.apache.storm.metric.api.IClusterMetricsConsumer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.Collection; -import java.util.Collections; - import static org.mockito.Mockito.mock; public class ClusterMetricsConsumerExecutorTest { @@ -39,7 +32,7 @@ public class ClusterMetricsConsumerExecutorTest { @Test public void testPrepareDoesNotThrowExceptionWhenInitializingClusterMetricsConsumerIsFailing() throws Exception { ClusterMetricsConsumerExecutor sut = new ClusterMetricsConsumerExecutor( - MockFailingClusterMetricsConsumer.class.getName(), 2); + MockFailingClusterMetricsConsumer.class.getName(), 2); // it shouldn't propagate any exceptions sut.prepare(); @@ -51,7 +44,7 @@ public class ClusterMetricsConsumerExecutorTest { @Test public void testHandleDataPointsWithClusterMetricsShouldSkipHandlingMetricsIfFailedBefore() throws Exception { ClusterMetricsConsumerExecutor sut = new ClusterMetricsConsumerExecutor( - MockFailingClusterMetricsConsumer.class.getName(), 2); + MockFailingClusterMetricsConsumer.class.getName(), 2); // below calls shouldn't propagate any exceptions sut.prepare(); @@ -66,7 +59,7 @@ public class ClusterMetricsConsumerExecutorTest { @Test public void testHandleDataPointsWithSupervisorMetricsShouldRetryInitializingClusterMetricsConsumerIfFailedBefore() throws Exception { ClusterMetricsConsumerExecutor sut = new ClusterMetricsConsumerExecutor( - MockFailingClusterMetricsConsumer.class.getName(), 2); + MockFailingClusterMetricsConsumer.class.getName(), 2); // below calls shouldn't propagate any exceptions sut.prepare(); @@ -84,28 +77,6 @@ public class ClusterMetricsConsumerExecutorTest { private static int handleDataPointsWithSupervisorInfoCallCount = 0; private static int cleanupCallCount = 0; - @Override - public void prepare(Object registrationArgument) { - prepareCallCount++; - - throw new RuntimeException("prepare failing..."); - } - - @Override - public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { - handleDataPointsWithClusterInfoCallCount++; - } - - @Override - public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { - handleDataPointsWithSupervisorInfoCallCount++; - } - - @Override - public void cleanup() { - cleanupCallCount++; - } - public static int getPrepareCallCount() { return prepareCallCount; } @@ -128,6 +99,28 @@ public class ClusterMetricsConsumerExecutorTest { handleDataPointsWithSupervisorInfoCallCount = 0; cleanupCallCount = 0; } + + @Override + public void prepare(Object registrationArgument) { + prepareCallCount++; + + throw new RuntimeException("prepare failing..."); + } + + @Override + public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { + handleDataPointsWithClusterInfoCallCount++; + } + + @Override + public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { + handleDataPointsWithSupervisorInfoCallCount++; + } + + @Override + public void cleanup() { + cleanupCallCount++; + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java b/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java index 64306a2..916fff0 100644 --- a/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java +++ b/storm-server/src/test/java/org/apache/storm/nimbus/LocalNimbusTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.storm.nimbus; import java.util.ArrayList; @@ -42,6 +43,19 @@ import org.junit.Test; */ public class LocalNimbusTest { + public static StormTopology createTestTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("words", new TestWordSpout(), generateParallelismHint()); + builder.setBolt("count", new TestWordCounter(), generateParallelismHint()).shuffleGrouping("words"); + builder.setBolt("globalCount", new TestGlobalCount(), generateParallelismHint()).shuffleGrouping("count"); + + return builder.createTopology(); + } + + private static int generateParallelismHint() { + return new Random().nextInt(9) + 1; + } + @Test public void testSubmitTopologyToLocalNimbus() throws Exception { int port = Utils.getAvailablePort(); @@ -68,19 +82,6 @@ public class LocalNimbusTest { } } - public static StormTopology createTestTopology() { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("words", new TestWordSpout(), generateParallelismHint()); - builder.setBolt("count", new TestWordCounter(), generateParallelismHint()).shuffleGrouping("words"); - builder.setBolt("globalCount", new TestGlobalCount(), generateParallelismHint()).shuffleGrouping("count"); - - return builder.createTopology(); - } - - private static int generateParallelismHint() { - return new Random().nextInt(9)+1; - } - public static class InmemoryTopologySubmitterHook implements ISubmitterHook { public static final List<TopologyDetails> submittedTopologies = new ArrayList<>(); @@ -106,8 +107,9 @@ public class LocalNimbusTest { TopologyDetails that = (TopologyDetails) o; - if (topologyName != null ? !topologyName.equals(that.topologyName) : that.topologyName != null) + if (topologyName != null ? !topologyName.equals(that.topologyName) : that.topologyName != null) { return false; + } return !(stormTopology != null ? !stormTopology.equals(that.stormTopology) : that.stormTopology != null); } @@ -122,9 +124,9 @@ public class LocalNimbusTest { @Override public String toString() { return "TopologyDetails{" + - "topologyName='" + topologyName + '\'' + - ", stormTopology=" + stormTopology + - '}'; + "topologyName='" + topologyName + '\'' + + ", stormTopology=" + stormTopology + + '}'; } } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java index b568b04..3b9eb14 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.scheduler; import java.util.HashMap; import java.util.Map; - import org.apache.storm.Config; import org.junit.Assert; import org.junit.Test; @@ -99,8 +93,8 @@ public class ClusterTest { @Test public void getAssignedMemoryForSlot_topologyWorkerLwChildopts() { - singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m", - TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0); + singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m", + TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0); } @Test
