Repository: cassandra Updated Branches: refs/heads/trunk 560faba2f -> d43b9ce50
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java new file mode 100644 index 0000000..b94b6ee --- /dev/null +++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java @@ -0,0 +1,409 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.RateLimiter; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.utils.TestTimeSource; +import org.apache.cassandra.utils.TimeSource; + +import static org.apache.cassandra.net.RateBasedBackPressure.FACTOR; +import static org.apache.cassandra.net.RateBasedBackPressure.FLOW; +import static org.apache.cassandra.net.RateBasedBackPressure.HIGH_RATIO; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class RateBasedBackPressureTest +{ + @Test(expected = IllegalArgumentException.class) + public void testAcceptsNoLessThanThreeArguments() throws Exception + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "1"), new TestTimeSource(), 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testHighRatioMustBeBiggerThanZero() throws Exception + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testHighRatioMustBeSmallerEqualThanOne() throws Exception + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "2", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testFactorMustBeBiggerEqualThanOne() throws Exception + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "0", FLOW, "FAST"), new TestTimeSource(), 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testWindowSizeMustBeBiggerEqualThanTen() throws Exception + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "5", FLOW, "FAST"), new TestTimeSource(), 1); + } + + @Test + public void testFlowMustBeEitherFASTorSLOW() throws Exception + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "FAST"), new TestTimeSource(), 10); + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "SLOW"), new TestTimeSource(), 10); + try + { + new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "WRONG"), new TestTimeSource(), 10); + fail("Expected to fail with wrong flow type."); + } + catch (Exception ex) + { + } + } + + @Test + public void testBackPressureStateUpdates() + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); + + RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + state.onMessageSent(null); + assertEquals(0, state.incomingRate.size()); + assertEquals(0, state.outgoingRate.size()); + + state = strategy.newState(InetAddress.getLoopbackAddress()); + state.onResponseReceived(); + assertEquals(1, state.incomingRate.size()); + assertEquals(1, state.outgoingRate.size()); + + state = strategy.newState(InetAddress.getLoopbackAddress()); + state.onResponseTimeout(); + assertEquals(0, state.incomingRate.size()); + assertEquals(1, state.outgoingRate.size()); + } + + @Test + public void testBackPressureIsNotUpdatedBeyondInfinity() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); + RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + + // Get initial rate: + double initialRate = state.rateLimiter.getRate(); + assertEquals(Double.POSITIVE_INFINITY, initialRate, 0.0); + + // Update incoming and outgoing rate equally: + state.incomingRate.update(1); + state.outgoingRate.update(1); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the rate doesn't change because already at infinity: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + assertEquals(initialRate, state.rateLimiter.getRate(), 0.0); + } + + @Test + public void testBackPressureIsUpdatedOncePerWindowSize() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); + RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + + // Get initial time: + long current = state.getLastIntervalAcquire(); + assertEquals(0, current); + + // Update incoming and outgoing rate: + state.incomingRate.update(1); + state.outgoingRate.update(1); + + // Move time ahead by window size: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the timestamp changed: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + current = state.getLastIntervalAcquire(); + assertEquals(timeSource.currentTimeMillis(), current); + + // Move time ahead by less than interval: + long previous = current; + timeSource.sleep(windowSize / 2, TimeUnit.MILLISECONDS); + + // Verify the last timestamp didn't change because below the window size: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + current = state.getLastIntervalAcquire(); + assertEquals(previous, current); + } + + @Test + public void testBackPressureWhenBelowHighRatio() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); + RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + + // Update incoming and outgoing rate so that the ratio is 0.5: + state.incomingRate.update(50); + state.outgoingRate.update(100); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the rate is decreased by factor: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + assertEquals(7.4, state.rateLimiter.getRate(), 0.1); + } + + @Test + public void testBackPressureRateLimiterIsIncreasedAfterGoingAgainAboveHighRatio() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); + RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + + // Update incoming and outgoing rate so that the ratio is 0.5: + state.incomingRate.update(50); + state.outgoingRate.update(100); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the rate decreased: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + assertEquals(7.4, state.rateLimiter.getRate(), 0.1); + + // Update incoming and outgoing rate back above high rate: + state.incomingRate.update(50); + state.outgoingRate.update(50); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify rate limiter is increased by factor: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + assertEquals(8.25, state.rateLimiter.getRate(), 0.1); + + // Update incoming and outgoing rate to keep it below the limiter rate: + state.incomingRate.update(1); + state.outgoingRate.update(1); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify rate limiter is not increased as already higher than the actual rate: + strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS); + assertEquals(8.25, state.rateLimiter.getRate(), 0.1); + } + + @Test + public void testBackPressureFastFlow() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); + RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + + // Update incoming and outgoing rates: + state1.incomingRate.update(50); + state1.outgoingRate.update(100); + state2.incomingRate.update(80); // fast + state2.outgoingRate.update(100); + state3.incomingRate.update(20); + state3.outgoingRate.update(100); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the fast replica rate limiting has been applied: + Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3); + strategy.apply(replicaGroup, 1, TimeUnit.SECONDS); + assertTrue(strategy.checkAcquired()); + assertTrue(strategy.checkApplied()); + assertEquals(12.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1); + } + + @Test + public void testBackPressureSlowFlow() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize); + RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + + // Update incoming and outgoing rates: + state1.incomingRate.update(50); + state1.outgoingRate.update(100); + state2.incomingRate.update(100); + state2.outgoingRate.update(100); + state3.incomingRate.update(20); // slow + state3.outgoingRate.update(100); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the slow replica rate limiting has been applied: + Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3); + strategy.apply(replicaGroup, 1, TimeUnit.SECONDS); + assertTrue(strategy.checkAcquired()); + assertTrue(strategy.checkApplied()); + assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1); + } + + @Test + public void testBackPressureWithDifferentGroups() throws Exception + { + long windowSize = 6000; + TestTimeSource timeSource = new TestTimeSource(); + TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize); + RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4")); + + // Update incoming and outgoing rates: + state1.incomingRate.update(50); // this + state1.outgoingRate.update(100); + state2.incomingRate.update(100); + state2.outgoingRate.update(100); + state3.incomingRate.update(20); // this + state3.outgoingRate.update(100); + state4.incomingRate.update(80); + state4.outgoingRate.update(100); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the first group: + Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2); + strategy.apply(replicaGroup, 1, TimeUnit.SECONDS); + assertTrue(strategy.checkAcquired()); + assertTrue(strategy.checkApplied()); + assertEquals(7.4, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1); + + // Verify the second group: + replicaGroup = Sets.newHashSet(state3, state4); + strategy.apply(replicaGroup, 1, TimeUnit.SECONDS); + assertTrue(strategy.checkAcquired()); + assertTrue(strategy.checkApplied()); + assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1); + } + + @Test + public void testBackPressurePastTimeout() throws Exception + { + long windowSize = 10000; + TestTimeSource timeSource = new TestTimeSource(); + TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize); + RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + + // Update incoming and outgoing rates: + state1.incomingRate.update(5); // slow + state1.outgoingRate.update(100); + state2.incomingRate.update(100); + state2.outgoingRate.update(100); + state3.incomingRate.update(100); + state3.outgoingRate.update(100); + + // Move time ahead: + timeSource.sleep(windowSize, TimeUnit.MILLISECONDS); + + // Verify the slow replica rate limiting has been applied: + Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3); + strategy.apply(replicaGroup, 4, TimeUnit.SECONDS); + assertTrue(strategy.checkAcquired()); + assertTrue(strategy.checkApplied()); + assertEquals(0.5, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1); + + // Make one more apply call to saturate the rate limit timeout (0.5 requests per second means 2 requests span + // 4 seconds, but we can only make one as we have to subtract the incoming response time): + strategy.apply(replicaGroup, 4, TimeUnit.SECONDS); + + // Now verify another call to apply doesn't acquire the rate limit because of the max timeout of 4 seconds minus + // 2 seconds of response time, so the time source itself sleeps two second: + long start = timeSource.currentTimeMillis(); + strategy.apply(replicaGroup, 4, TimeUnit.SECONDS); + assertFalse(strategy.checkAcquired()); + assertTrue(strategy.checkApplied()); + assertEquals(TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS), + strategy.timeout); + assertEquals(strategy.timeout, + TimeUnit.NANOSECONDS.convert(timeSource.currentTimeMillis() - start, TimeUnit.MILLISECONDS)); + } + + public static class TestableBackPressure extends RateBasedBackPressure + { + public volatile boolean acquired = false; + public volatile boolean applied = false; + public volatile long timeout; + + public TestableBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize) + { + super(args, timeSource, windowSize); + } + + @Override + public boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos) + { + acquired = super.doRateLimit(rateLimiter, timeoutInNanos); + applied = true; + timeout = timeoutInNanos; + return acquired; + } + + public boolean checkAcquired() + { + boolean checked = acquired; + acquired = false; + return checked; + } + + public boolean checkApplied() + { + boolean checked = applied; + applied = false; + return checked; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java new file mode 100644 index 0000000..8c11f9d --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +public class SlidingTimeRateTest +{ + @Test + public void testUpdateAndGet() + { + SlidingTimeRate rate = new SlidingTimeRate(new TestTimeSource(), 10, 1, TimeUnit.SECONDS); + int updates = 100; + for (int i = 0; i < updates; i++) + { + rate.update(1); + } + Assert.assertEquals(updates, rate.get(TimeUnit.SECONDS), 0.0); + } + + @Test + public void testUpdateAndGetBetweenWindows() throws InterruptedException + { + TestTimeSource time = new TestTimeSource(); + SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS); + int updates = 100; + for (int i = 0; i < updates; i++) + { + rate.update(1); + time.sleep(100, TimeUnit.MILLISECONDS); + } + Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0); + } + + @Test + public void testUpdateAndGetPastWindowSize() throws InterruptedException + { + TestTimeSource time = new TestTimeSource(); + SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS); + int updates = 100; + for (int i = 0; i < updates; i++) + { + rate.update(1); + } + + time.sleep(6, TimeUnit.SECONDS); + + Assert.assertEquals(0, rate.get(TimeUnit.SECONDS), 0.0); + } + + @Test + public void testUpdateAndGetToPointInTime() throws InterruptedException + { + TestTimeSource time = new TestTimeSource(); + SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS); + int updates = 10; + for (int i = 0; i < updates; i++) + { + rate.update(1); + time.sleep(100, TimeUnit.MILLISECONDS); + } + + time.sleep(1, TimeUnit.SECONDS); + + Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0); + Assert.assertEquals(10, rate.get(1, TimeUnit.SECONDS), 0.0); + } + + @Test + public void testDecay() throws InterruptedException + { + TestTimeSource time = new TestTimeSource(); + SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS); + int updates = 10; + for (int i = 0; i < updates; i++) + { + rate.update(1); + time.sleep(100, TimeUnit.MILLISECONDS); + } + Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0); + + time.sleep(1, TimeUnit.SECONDS); + + Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0); + + time.sleep(2, TimeUnit.SECONDS); + + Assert.assertEquals(2.5, rate.get(TimeUnit.SECONDS), 0.0); + } + + @Test + public void testPruning() throws InterruptedException + { + TestTimeSource time = new TestTimeSource(); + SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS); + + rate.update(1); + Assert.assertEquals(1, rate.size()); + + time.sleep(6, TimeUnit.SECONDS); + + rate.prune(); + Assert.assertEquals(0, rate.size()); + } + + @Test + public void testConcurrentUpdateAndGet() throws InterruptedException + { + final ExecutorService executor = Executors.newFixedThreadPool(FBUtilities.getAvailableProcessors()); + final TestTimeSource time = new TestTimeSource(); + final SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS); + int updates = 100000; + for (int i = 0; i < updates; i++) + { + executor.submit(() -> { + time.sleep(1, TimeUnit.MILLISECONDS); + rate.update(1); + }); + } + + executor.shutdown(); + + Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + Assert.assertEquals(1000, rate.get(TimeUnit.SECONDS), 100.0); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/TestTimeSource.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/TestTimeSource.java b/test/unit/org/apache/cassandra/utils/TestTimeSource.java new file mode 100644 index 0000000..4ecd086 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/TestTimeSource.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class TestTimeSource implements TimeSource +{ + private final AtomicLong timeInMillis = new AtomicLong(System.currentTimeMillis()); + + @Override + public long currentTimeMillis() + { + return timeInMillis.get(); + } + + @Override + public long nanoTime() + { + return timeInMillis.get() * 1_000_000; + } + + @Override + public TimeSource sleep(long sleepFor, TimeUnit unit) + { + long current = timeInMillis.get(); + long sleepInMillis = TimeUnit.MILLISECONDS.convert(sleepFor, unit); + boolean elapsed; + do + { + long newTime = current + sleepInMillis; + elapsed = timeInMillis.compareAndSet(current, newTime); + if (!elapsed) + { + long updated = timeInMillis.get(); + if (updated - current >= sleepInMillis) + { + elapsed = true; + } + else + { + sleepInMillis -= updated - current; + current = updated; + } + } + } + while (!elapsed); + return this; + } + + @Override + public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit) + { + return sleep(sleepFor, unit); + } +}