Repository: curator Updated Branches: refs/heads/CURATOR-116 [created] d062906bc
CURATOR-116 - Modified sorting of children to be deterministic. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1103f476 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1103f476 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1103f476 Branch: refs/heads/CURATOR-116 Commit: 1103f476d84a53784377a96f8661bd9219e7412d Parents: 6e98562 Author: Cameron McKenzie <[email protected]> Authored: Wed Jun 25 11:53:00 2014 +1000 Committer: Cameron McKenzie <[email protected]> Committed: Wed Jun 25 11:53:00 2014 +1000 ---------------------------------------------------------------------- .../recipes/queue/DistributedDelayQueue.java | 12 +++- .../queue/TestDistributedDelayQueue.java | 67 +++++++++++++++++++- 2 files changed, 76 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/1103f476/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java index b84471f..bd90e71 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java @@ -77,14 +77,22 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T> finalFlushMs ) { + @Override protected long getDelay(String itemNode) { + return getDelay(itemNode, System.currentTimeMillis()); + } + + protected long getDelay(String itemNode, long sortTime) + { long epoch = getEpoch(itemNode); - return epoch - System.currentTimeMillis(); + return epoch - sortTime; } + @Override protected void sortChildren(List<String> children) { + final long sortTime = System.currentTimeMillis(); Collections.sort ( children, @@ -93,7 +101,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T> @Override public int compare(String o1, String o2) { - long diff = getDelay(o1) - getDelay(o2); + long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } http://git-wip-us.apache.org/repos/asf/curator/blob/1103f476/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java index d6b592f..3759c34 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java @@ -28,6 +28,12 @@ import org.apache.curator.test.Timing; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -107,7 +113,7 @@ public class TestDistributedDelayQueue extends BaseClassForTests queue.start(); Random random = new Random(); - for ( int i = 0; i < 10; ++i ) + for ( int i = 0; i < QTY; ++i ) { long delay = System.currentTimeMillis() + random.nextInt(100); queue.put(delay, delay); @@ -128,6 +134,65 @@ public class TestDistributedDelayQueue extends BaseClassForTests CloseableUtils.closeQuietly(client); } } + + @Test + public void testSorting() throws Exception + { + //Need to use a fairly large number to ensure that sorting can take some time. + final int QTY = 1000; + + Timing timing = new Timing(); + DistributedDelayQueue<Long> queue = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + try + { + BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class)); + queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test2").putInBackground(false).buildDelayQueue(); + queue.start(); + + Map<Long, Long> data = new HashMap<Long, Long>(); + + //Make the earliest a second into the future, so we can ensure that everything's + //been added prior to the consumption starting. Otherwise it's possible to start + //processing entries before they've all been added so the ordering will be + //incorrect. + long delay = System.currentTimeMillis() + 5000; + for ( long i = 0; i < QTY; ++i ) + { + data.put(delay, i); + + //We want to make the elements close together but not exactly the same MS. + delay += 1; + } + + //Randomly sort the list + List<Long> keys = new ArrayList<Long>(data.keySet()); + Collections.shuffle(keys); + + //Put the messages onto the queue in random order, but with the appropriate + //delay and value + for ( Long key : keys ) + { + queue.put(data.get(key), key); + } + + long lastValue = -1; + for ( int i = 0; i < QTY; ++i ) + { + Long value = consumer.take(6, TimeUnit.SECONDS); + Assert.assertNotNull(value); + Assert.assertEquals(value, new Long(lastValue + 1)); + lastValue = value; + } + } + finally + { + CloseableUtils.closeQuietly(queue); + CloseableUtils.closeQuietly(client); + } + } + private static class LongSerializer implements QueueSerializer<Long> {
