Repository: curator Updated Branches: refs/heads/CURATOR-3.0 41a700429 -> 1735000d9
Parent container deletions could cause a hang as the watcher would never fire. If parent containers get deleted now, reset and recreate them Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9261ee62 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9261ee62 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9261ee62 Branch: refs/heads/CURATOR-3.0 Commit: 9261ee622e2e2dd7b16969561cb597712f078acd Parents: 48e2c15 Author: randgalt <[email protected]> Authored: Fri May 20 15:39:36 2016 -0500 Committer: randgalt <[email protected]> Committed: Fri May 20 15:39:36 2016 -0500 ---------------------------------------------------------------------- .../curator/framework/EnsureContainers.java | 5 ++ .../recipes/queue/SimpleDistributedQueue.java | 14 ++- .../queue/TestSimpleDistributedQueue.java | 89 ++++++++++++++++++++ 3 files changed, 106 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/9261ee62/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java b/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java index 697df62..b002b90 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java @@ -54,6 +54,11 @@ public class EnsureContainers } } + public void reset() + { + ensureNeeded.set(true); + } + private synchronized void internalEnsure() throws Exception { if ( ensureNeeded.compareAndSet(true, false) ) http://git-wip-us.apache.org/repos/asf/curator/blob/9261ee62/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java index 35afb53..c80ad36 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java @@ -200,7 +200,17 @@ public class SimpleDistributedQueue latch.countDown(); } }; - byte[] bytes = internalElement(true, watcher); + byte[] bytes = new byte[0]; + try + { + bytes = internalElement(true, watcher); + } + catch ( NoSuchElementException dummy ) + { + log.debug("Parent containers appear to have lapsed - recreate and retry"); + ensureContainers.reset(); + continue; + } if ( bytes != null ) { return bytes; @@ -234,7 +244,7 @@ public class SimpleDistributedQueue } catch ( KeeperException.NoNodeException dummy ) { - return null; + throw new NoSuchElementException(); } Collections.sort(nodes); http://git-wip-us.apache.org/repos/asf/curator/blob/9261ee62/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java index dab1674..ec4c3d1 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java @@ -19,6 +19,8 @@ package org.apache.curator.framework.recipes.queue; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.TestingServer; +import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -26,6 +28,8 @@ import org.apache.curator.retry.RetryOneTime; import org.testng.Assert; import org.testng.annotations.Test; import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; @@ -33,6 +37,91 @@ import static org.testng.Assert.assertTrue; public class TestSimpleDistributedQueue extends BaseClassForTests { + private static abstract class QueueUser implements Runnable + { + private static final String QUEUE_PATH = "/queue"; + private static final int ITEM_COUNT = 10; + + protected final SimpleDistributedQueue queue; + private final int sleepMillis; + + public QueueUser(CuratorFramework curator, int sleepMillis) + { + this.queue = new SimpleDistributedQueue(curator, QUEUE_PATH); + this.sleepMillis = sleepMillis; + } + + @Override + public void run() + { + try + { + for ( int i = 0; i < ITEM_COUNT; i++ ) + { + processItem(i); + Thread.sleep(sleepMillis); + } + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + protected abstract void processItem(int itemNumber) throws Exception; + } + + @Test + public void testHangFromContainerLoss() throws Exception + { + // for CURATOR-308 + + server.close(); + System.setProperty("znode.container.checkIntervalMs", "100"); + server = new TestingServer(); + + Timing timing = new Timing().multiple(.1); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(new QueueUser(client, timing.milliseconds()) + { + @Override + protected void processItem(int itemNumber) throws Exception + { + System.out.println("Offering item"); + queue.offer(new byte[]{(byte)itemNumber}); + } + }); + + executor.execute(new QueueUser(client, timing.multiple(.5).milliseconds()) + { + @Override + protected void processItem(int itemNumber) throws Exception + { + System.out.println("Taking item " + itemNumber); + byte[] item = queue.take(); + if ( item == null ) + { + throw new IllegalStateException("Null result for item " + itemNumber); + } + System.out.println("Got item " + item[0]); + } + }); + + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination((QueueUser.ITEM_COUNT * 2) * timing.milliseconds(), TimeUnit.MILLISECONDS)); + } + finally + { + CloseableUtils.closeQuietly(client); + System.clearProperty("znode.container.checkIntervalMs"); + } + } + @Test public void testPollWithTimeout() throws Exception {
