Repository: curator Updated Branches: refs/heads/CURATOR-452 [created] cc30b67c8
Fix for CURATOR-452 race in ServiceCacheImpl's start() method caused by an optimization whereby it clears the dataBytes of its internal PathChildrenCache - was causing an intermittent NPE Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cc30b67c Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cc30b67c Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cc30b67c Branch: refs/heads/CURATOR-452 Commit: cc30b67c8dbe24babd31ed654e9536fe776b8a18 Parents: a1f620e Author: randgalt <[email protected]> Authored: Sun Jun 24 09:08:02 2018 -0500 Committer: randgalt <[email protected]> Committed: Sun Jun 24 09:08:02 2018 -0500 ---------------------------------------------------------------------- .../x/discovery/details/ServiceCacheImpl.java | 22 +++- .../discovery/details/TestServiceCacheRace.java | 110 +++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/cc30b67c/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index b8f39d5..d1a31ad 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -18,6 +18,7 @@ */ package org.apache.curator.x.discovery.details; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -36,6 +37,7 @@ import org.apache.curator.x.discovery.ServiceInstance; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -85,15 +87,33 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi return Lists.newArrayList(instances.values()); } + @VisibleForTesting + volatile CountDownLatch debugStartLatch = null; + volatile CountDownLatch debugStartWaitLatch = null; + @Override public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); cache.start(true); + if ( debugStartLatch != null ) + { + debugStartLatch.countDown(); + debugStartLatch = null; + } + if ( debugStartWaitLatch != null ) + { + debugStartWaitLatch.await(); + debugStartWaitLatch = null; + } + for ( ChildData childData : cache.getCurrentData() ) { - addInstance(childData, true); + if ( childData.getData() != null ) // else already processed by the cache listener + { + addInstance(childData, true); + } } discovery.cacheOpened(this); } http://git-wip-us.apache.org/repos/asf/curator/blob/cc30b67c/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java new file mode 100644 index 0000000..08a2a8e --- /dev/null +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java @@ -0,0 +1,110 @@ +package org.apache.curator.x.discovery.details; + +import com.google.common.collect.Lists; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.io.Closeable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +public class TestServiceCacheRace extends BaseClassForTests +{ + private final Timing timing = new Timing(); + + // validates CURATOR-452 which exposed a race in ServiceCacheImpl's start() method caused by an optimization whereby it clears the dataBytes of its internal PathChildrenCache + @Test + public void testRaceOnInitialLoad() throws Exception + { + List<Closeable> closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + CountDownLatch cacheStartLatch = new CountDownLatch(1); + CountDownLatch cacheWaitLatch = new CountDownLatch(1); + final ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").build(); + closeables.add(cache); + ((ServiceCacheImpl)cache).debugStartLatch = cacheStartLatch; // causes ServiceCacheImpl.start to notify just after starting its internal PathChildrenCache + ((ServiceCacheImpl)cache).debugStartWaitLatch = cacheWaitLatch; // causes ServiceCacheImpl.start to wait before iterating over its internal PathChildrenCache + + ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("test").name("test").port(10064).build(); + discovery.registerService(instance1); + + CloseableExecutorService closeableExecutorService = new CloseableExecutorService(Executors.newSingleThreadExecutor()); + closeables.add(closeableExecutorService); + final CountDownLatch startCompletedLatch = new CountDownLatch(1); + Runnable proc = new Runnable() + { + @Override + public void run() + { + try + { + cache.start(); + startCompletedLatch.countDown(); + } + catch ( Exception e ) + { + LoggerFactory.getLogger(getClass()).error("Start failed", e); + throw new RuntimeException(e); + } + } + }; + closeableExecutorService.submit(proc); + Assert.assertTrue(timing.awaitLatch(cacheStartLatch)); // wait until ServiceCacheImpl's internal PathChildrenCache is started and primed + + final CountDownLatch cacheChangedLatch = new CountDownLatch(1); + ServiceCacheListener listener = new ServiceCacheListener() + { + @Override + public void cacheChanged() + { + cacheChangedLatch.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + // NOP + } + }; + cache.addListener(listener); + ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("test").name("test").port(10065).build(); + discovery.registerService(instance2); // cause ServiceCacheImpl's internal PathChildrenCache listener to get called which will clear the dataBytes + Assert.assertTrue(timing.awaitLatch(cacheChangedLatch)); + + cacheWaitLatch.countDown(); + + Assert.assertTrue(timing.awaitLatch(startCompletedLatch)); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } +}
