STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: time changes
copy Time.java from 1.x-branch to allow use of nanoTime() in storm-kafka-client, and also update SlotTest to use try-with-resources since new Time implementation ditched startSimulatingAutoAdvanceOnSleep(). This was a selective cherry-pick of a03137ed, retaining only those changes needed. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29fc006d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29fc006d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29fc006d Branch: refs/heads/1.0.x-branch Commit: 29fc006d379c055fc69c65be47de8e4229987d6a Parents: 6d92df3 Author: Erik Weathers <eri...@gmail.com> Authored: Tue Feb 6 20:02:43 2018 -0800 Committer: Erik Weathers <eri...@gmail.com> Committed: Wed Feb 7 18:53:24 2018 -0800 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/utils/Time.java | 183 +++++++++++++------ .../storm/daemon/supervisor/SlotTest.java | 31 +--- 2 files changed, 139 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/29fc006d/storm-core/src/jvm/org/apache/storm/utils/Time.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java index e501b6c..a6a4fe1 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Time.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java @@ -24,38 +24,67 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time. + * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls. + * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime(). + */ public class Time { public static final Logger LOG = LoggerFactory.getLogger(Time.class); private static AtomicBoolean simulating = new AtomicBoolean(false); - private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0); + private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0); //TODO: should probably use weak references here or something - private static volatile Map<Thread, AtomicLong> threadSleepTimes; + private static volatile Map<Thread, AtomicLong> threadSleepTimesNanos; private static final Object sleepTimesLock = new Object(); + private static AtomicLong simulatedCurrTimeNanos; - private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing? - - public static void startSimulating() { - synchronized(sleepTimesLock) { - simulating.set(true); - simulatedCurrTimeMs = new AtomicLong(0); - threadSleepTimes = new ConcurrentHashMap<>(); + public static class SimulatedTime implements AutoCloseable { + + public SimulatedTime() { + this(null); + } + + public SimulatedTime(Number advanceTimeMs) { + synchronized(Time.sleepTimesLock) { + Time.simulating.set(true); + Time.simulatedCurrTimeNanos = new AtomicLong(0); + Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + if (advanceTimeMs != null) { + Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue())); + } + LOG.warn("AutoCloseable Simulated Time Starting..."); + } + } + + @Override + public void close() { + synchronized(Time.sleepTimesLock) { + Time.simulating.set(false); + Time.autoAdvanceNanosOnSleep.set(0); + Time.threadSleepTimesNanos = null; + LOG.warn("AutoCloseable Simulated Time Ending..."); + } } } - public static void startSimulatingAutoAdvanceOnSleep(long ms) { - synchronized(sleepTimesLock) { - startSimulating(); - autoAdvanceOnSleep.set(ms); + @Deprecated + public static void startSimulating() { + synchronized(Time.sleepTimesLock) { + Time.simulating.set(true); + Time.simulatedCurrTimeNanos = new AtomicLong(0); + Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + LOG.warn("Simulated Time Starting..."); } } + @Deprecated public static void stopSimulating() { - synchronized(sleepTimesLock) { - simulating.set(false); - autoAdvanceOnSleep.set(0); - threadSleepTimes = null; + synchronized(Time.sleepTimesLock) { + Time.simulating.set(false); + Time.autoAdvanceNanosOnSleep.set(0); + Time.threadSleepTimesNanos = null; + LOG.warn("Simulated Time Ending..."); } } @@ -65,44 +94,66 @@ public class Time { public static void sleepUntil(long targetTimeMs) throws InterruptedException { if(simulating.get()) { - try { - synchronized(sleepTimesLock) { - if (threadSleepTimes == null) { + simulatedSleepUntilNanos(millisToNanos(targetTimeMs)); + } else { + long sleepTimeMs = targetTimeMs - currentTimeMillis(); + if(sleepTimeMs>0) { + Thread.sleep(sleepTimeMs); + } + } + } + + public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException { + if(simulating.get()) { + simulatedSleepUntilNanos(targetTimeNanos); + } else { + long sleepTimeNanos = targetTimeNanos-nanoTime(); + long sleepTimeMs = nanosToMillis(sleepTimeNanos); + int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000); + if(sleepTimeNanos>0) { + Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs); + } + } + } + + private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException { + try { + synchronized (sleepTimesLock) { + if (threadSleepTimesNanos == null) { + LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); + throw new InterruptedException(); + } + threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos)); + } + while (simulatedCurrTimeNanos.get() < targetTimeNanos) { + synchronized (sleepTimesLock) { + if (threadSleepTimesNanos == null) { LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); throw new InterruptedException(); } - threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs)); } - while(simulatedCurrTimeMs.get() < targetTimeMs) { - synchronized(sleepTimesLock) { - if (threadSleepTimes == null) { - LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); - throw new InterruptedException(); - } - } - long autoAdvance = autoAdvanceOnSleep.get(); - if (autoAdvance > 0) { - advanceTime(autoAdvance); - } - Thread.sleep(10); + long autoAdvance = autoAdvanceNanosOnSleep.get(); + if (autoAdvance > 0) { + advanceTimeNanos(autoAdvance); } - } finally { - synchronized(sleepTimesLock) { - if (simulating.get() && threadSleepTimes != null) { - threadSleepTimes.remove(Thread.currentThread()); - } + Thread.sleep(10); + } + } finally { + synchronized (sleepTimesLock) { + if (simulating.get() && threadSleepTimesNanos != null) { + threadSleepTimesNanos.remove(Thread.currentThread()); } } - } else { - long sleepTime = targetTimeMs-currentTimeMillis(); - if(sleepTime>0) - Thread.sleep(sleepTime); } } public static void sleep(long ms) throws InterruptedException { sleepUntil(currentTimeMillis()+ms); } + + public static void sleepNanos(long nanos) throws InterruptedException { + sleepUntilNanos(nanoTime() + nanos); + } public static void sleepSecs (long secs) throws InterruptedException { if (secs > 0) { @@ -110,14 +161,30 @@ public class Time { } } + public static long nanoTime() { + if (simulating.get()) { + return simulatedCurrTimeNanos.get(); + } else { + return System.nanoTime(); + } + } + public static long currentTimeMillis() { if(simulating.get()) { - return simulatedCurrTimeMs.get(); + return nanosToMillis(simulatedCurrTimeNanos.get()); } else { return System.currentTimeMillis(); } } + public static long nanosToMillis(long nanos) { + return nanos/1_000_000; + } + + public static long millisToNanos(long millis) { + return millis*1_000_000; + } + public static long secsToMillis (int secs) { return 1000*(long) secs; } @@ -139,18 +206,32 @@ public class Time { } public static void advanceTime(long ms) { - if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); - if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); - long newTime = simulatedCurrTimeMs.addAndGet(ms); - LOG.warn("Advanced simulated time to {}", newTime); + advanceTimeNanos(millisToNanos(ms)); + } + + public static void advanceTimeNanos(long nanos) { + if (!simulating.get()) { + throw new IllegalStateException("Cannot simulate time unless in simulation mode"); + } + if (nanos < 0) { + throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); + } + long newTime = simulatedCurrTimeNanos.addAndGet(nanos); + LOG.debug("Advanced simulated time to {}", newTime); + } + + public static void advanceTimeSecs(long secs) { + advanceTime(secs * 1_000); } public static boolean isThreadWaiting(Thread t) { - if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode"); + if(!simulating.get()) { + throw new IllegalStateException("Must be in simulation mode"); + } AtomicLong time; synchronized(sleepTimesLock) { - time = threadSleepTimes.get(t); + time = threadSleepTimesNanos.get(t); } - return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue(); + return !t.isAlive() || time!=null && nanoTime() < time.longValue(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/29fc006d/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java index 24ccda5..9cd85f8 100644 --- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java @@ -43,6 +43,7 @@ import org.apache.storm.localizer.ILocalizer; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Test; public class SlotTest { @@ -113,8 +114,7 @@ public class SlotTest { @Test public void testEmptyToEmpty() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { ILocalizer localizer = mock(ILocalizer.class); LocalState state = mock(LocalState.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); @@ -125,15 +125,12 @@ public class SlotTest { DynamicState nextState = Slot.handleEmpty(dynamicState, staticState); assertEquals(MachineState.EMPTY, nextState.state); assertTrue(Time.currentTimeMillis() > 1000); - } finally { - Time.stopSimulating(); } } @Test public void testLaunchContainerFromEmpty() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String topoId = "NEW"; List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5); @@ -210,16 +207,13 @@ public class SlotTest { assertSame(newAssignment, nextState.currentAssignment); assertSame(container, nextState.container); assertTrue(Time.currentTimeMillis() > 2000); - } finally { - Time.stopSimulating(); } } @Test public void testRelaunch() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String topoId = "CURRENT"; List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5); @@ -260,15 +254,12 @@ public class SlotTest { nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); - } finally { - Time.stopSimulating(); } } @Test public void testReschedule() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); @@ -368,16 +359,13 @@ public class SlotTest { assertSame(nAssignment, nextState.currentAssignment); assertSame(nContainer, nextState.container); assertTrue(Time.currentTimeMillis() > 4000); - } finally { - Time.stopSimulating(); } } @Test public void testRunningToEmpty() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); @@ -432,15 +420,12 @@ public class SlotTest { assertEquals(null, nextState.container); assertEquals(null, nextState.currentAssignment); assertTrue(Time.currentTimeMillis() > 3000); - } finally { - Time.stopSimulating(); } } @Test public void testRunWithProfileActions() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); @@ -508,8 +493,6 @@ public class SlotTest { assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions); assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions); assertTrue(Time.currentTimeMillis() > 5000); - } finally { - Time.stopSimulating(); } } }