[GEODE-2324] Dunit test for AcceptorImpl.close() * Add AcceptorImplObserver * Add AccceptorImplDunitTest
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1d561dcc Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1d561dcc Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1d561dcc Branch: refs/heads/develop Commit: 1d561dccaf8736972c6d355244b0fe54b9bd7a8b Parents: cae580f Author: Galen O'Sullivan <gosulli...@pivotal.io> Authored: Mon Jan 23 21:49:35 2017 -0800 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Thu Feb 9 14:28:48 2017 -0800 ---------------------------------------------------------------------- .../cache/tier/sockets/AcceptorImpl.java | 12 ++ .../sockets/command/AcceptorImplObserver.java | 47 ++++++ .../tier/sockets/AcceptorImplDUnitTest.java | 164 +++++++++++++++++++ .../tier/sockets/AcceptorImplJUnitTest.java | 42 ++--- 4 files changed, 239 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 37438b6..d72c72b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; +import org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -1528,8 +1529,12 @@ public class AcceptorImpl extends Acceptor implements Runnable { @Override public void close() { + AcceptorImplObserver acceptorImplObserver = AcceptorImplObserver.getInstance(); try { synchronized (syncLock) { + if (acceptorImplObserver != null) { + acceptorImplObserver.beforeClose(this); + } if (!isRunning()) { return; } @@ -1608,9 +1613,16 @@ public class AcceptorImpl extends Acceptor implements Runnable { } } } + if (acceptorImplObserver != null) { + acceptorImplObserver.normalCloseTermination(this); + } } // synchronized } catch (RuntimeException e) {/* ignore and log */ logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), e); + } finally { + if (acceptorImplObserver != null) { + acceptorImplObserver.afterClose(this); + } } } http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java new file mode 100644 index 0000000..f5ee982 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets.command; + +import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; + +/** + * AcceptorImplObserver is an observer/visitor for AcceptorImpl that is used for testing. + */ +public abstract class AcceptorImplObserver { + private static AcceptorImplObserver instance; + + /** + * Set the instance of the observer. Setting to null will clear the observer. + * + * @param instance + * @return the old observer, or null if there was no old observer. + */ + public static final AcceptorImplObserver setInstance(AcceptorImplObserver instance) { + AcceptorImplObserver oldInstance = AcceptorImplObserver.instance; + AcceptorImplObserver.instance = instance; + return oldInstance; + } + + public static final AcceptorImplObserver getInstance() { + return instance; + } + + public void beforeClose(AcceptorImpl acceptorImpl) {} + + public void normalCloseTermination(AcceptorImpl acceptorImpl) {} + + public void afterClose(AcceptorImpl acceptorImpl) {} +} http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java new file mode 100644 index 0000000..8b4c672 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheWriterAdapter; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; + +import static org.junit.Assert.*; + +/** + * Tests for AcceptorImpl. + */ +@Category(DistributedTest.class) +public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase { + private static Cache cache; + + public AcceptorImplDUnitTest() { + super(); + } + + @Override + public void postTearDown() throws Exception { + if (cache != null) { + cache.close(); + cache = null; + } + super.postTearDown(); + } + + public static class SleepyCacheWriter<K, V> extends CacheWriterAdapter<K, V> { + @Override + public void beforeCreate(EntryEvent<K, V> event) { + while (true) { + System.out.println("Sleeping a long time."); + try { + Thread.sleep(100000000); + } catch (InterruptedException ignore) { + } + } + } + } + + /** + * Dump threads to standard out. For debugging. + */ + private void dumpThreads() { + ThreadMXBean bean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] infos = bean.dumpAllThreads(true, true); + System.out.println("infos = " + Arrays.toString(infos)); + } + + /** + * GEODE-2324. There was a bug where, due to an uncaught exception, `AcceptorImpl.close()` was + * short-circuiting and failing to clean up properly. + * + * What this test does is start a Cache and hook the Acceptor to interrupt the thread before the + * place where an InterruptedException could be thrown. It interrupts the thread, and checks that + * the thread has terminated normally without short-circuiting. It doesn't check that every part + * of the AcceptorImpl has shut down properly -- that seems both difficult to check (especially + * since the fields are private) and implementation-dependent. + */ + @Test + public void testShutdownCatchesException() throws Exception { + final String hostname = Host.getHost(0).getHostName(); + final VM clientVM = Host.getHost(0).getVM(0); + + // AtomicBooleans can be set from wherever they are, including an anonymous class or other + // thread. + AtomicBoolean terminatedNormally = new AtomicBoolean(false); + AtomicBoolean passedPostConditions = new AtomicBoolean(false); + + Properties props = new Properties(); + props.setProperty(MCAST_PORT, "0"); + + AcceptorImplObserver.setInstance(new AcceptorImplObserver() { + @Override + public void beforeClose(AcceptorImpl acceptorImpl) { + Thread.currentThread().interrupt(); + } + + @Override + public void normalCloseTermination(AcceptorImpl acceptorImpl) { + terminatedNormally.set(true); + } + + @Override + public void afterClose(AcceptorImpl acceptorImpl) { + passedPostConditions.set(!acceptorImpl.isRunning()); + } + }); + + try (InternalCache cache = (InternalCache) new CacheFactory(props).create()) { + RegionFactory<Object, Object> regionFactory = + cache.createRegionFactory(RegionShortcut.PARTITION); + + regionFactory.setCacheWriter(new SleepyCacheWriter<>()); + + final CacheServer server = cache.addCacheServer(); + final int port = AvailablePortHelper.getRandomAvailableTCPPort(); + server.setPort(port); + server.start(); + + regionFactory.create("region1"); + + clientVM.invokeAsync(() -> { + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + clientCacheFactory.addPoolServer(hostname, port); + ClientCache clientCache = clientCacheFactory.create(); + Region<Object, Object> clientRegion1 = + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("region1"); + clientRegion1.put("foo", "bar"); + }); + + cache.close(); + + dumpThreads(); + assertTrue(terminatedNormally.get()); + assertTrue(passedPostConditions.get()); + + // cleanup. + AcceptorImplObserver.setInstance(null); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java index 7aa11b7..58c2157 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java @@ -15,9 +15,13 @@ package org.apache.geode.internal.cache.tier.sockets; import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ServerRefusedConnectionException; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedSystem; @@ -42,30 +46,16 @@ import java.net.BindException; import java.net.Socket; import java.util.Collections; import java.util.Properties; +import java.util.Set; +import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID.system; import static org.junit.Assert.*; @Category({IntegrationTest.class, ClientServerTest.class}) public class AcceptorImplJUnitTest { - DistributedSystem system; - InternalCache cache; - - @Before - public void setUp() throws Exception { - Properties p = new Properties(); - p.setProperty(MCAST_PORT, "0"); - this.system = DistributedSystem.connect(p); - this.cache = (InternalCache) CacheFactory.create(system); - } - - @After - public void tearDown() throws Exception { - this.cache.close(); - this.system.disconnect(); - } - /* * Test method for 'org.apache.geode.internal.cache.tier.sockets.AcceptorImpl(int, int, boolean, * int, Cache)' @@ -74,15 +64,16 @@ public class AcceptorImplJUnitTest { @Test public void testConstructor() throws CacheException, IOException { AcceptorImpl a1 = null, a2 = null, a3 = null; - try { + Properties props = new Properties(); + props.setProperty(MCAST_PORT, "0"); + try (InternalCache cache = (InternalCache) new CacheFactory(props).create()) { final int[] freeTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2); int port1 = freeTCPPorts[0]; int port2 = freeTCPPorts[1]; - try { new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); @@ -92,7 +83,7 @@ public class AcceptorImplJUnitTest { try { new AcceptorImpl(port2, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, 0, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); @@ -102,12 +93,12 @@ public class AcceptorImplJUnitTest { try { a1 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); @@ -116,13 +107,12 @@ public class AcceptorImplJUnitTest { } a3 = new AcceptorImpl(port2, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); assertEquals(port2, a3.getPort()); - InternalDistributedSystem isystem = - (InternalDistributedSystem) this.cache.getDistributedSystem(); + InternalDistributedSystem isystem = (InternalDistributedSystem) cache.getDistributedSystem(); DistributionConfig config = isystem.getConfig(); String bindAddress = config.getBindAddress(); if (bindAddress == null || bindAddress.length() <= 0) {