Repository: incubator-geode Updated Branches: refs/heads/develop 86c89a94d -> 3b7e3ff35
GEODE-911: Cleaning up SerialGatewaySender queues when stopping gateway The stop() method was setting the EventProcessor to null before cleaning up the queues. This led to no queues being cleaned up because the method getQueues would return null if the eventProcessor was null. Refactors WanTestBase code, remove/condensing duplicated code Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3b7e3ff3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3b7e3ff3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3b7e3ff3 Branch: refs/heads/develop Commit: 3b7e3ff357f52da388d2cfd91acf2385f1f7d2dd Parents: 86c89a9 Author: Jason Huynh <[email protected]> Authored: Thu Mar 17 16:24:40 2016 -0700 Committer: Jason Huynh <[email protected]> Committed: Wed Mar 23 10:13:48 2016 -0700 ---------------------------------------------------------------------- .../wan/serial/SerialGatewaySenderQueue.java | 7 + .../wan/serial/SerialGatewaySenderImpl.java | 26 +-- .../gemfire/internal/cache/wan/WANTestBase.java | 218 ++++++------------- ...GatewaySenderOperationsOffHeapDUnitTest.java | 2 +- .../SerialGatewaySenderOperationsDUnitTest.java | 7 +- 5 files changed, 84 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 430409a..1a3ae8e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -1046,6 +1046,13 @@ public class SerialGatewaySenderQueue implements RegionQueue { } } + public boolean isRemovalThreadAlive() { + if (this.removalThread != null) { + return this.removalThread.isAlive(); + } + return false; + } + @Override public void close() { Region r = getRegion(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java index 8f0070f..bf9fc56 100644 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -15,17 +15,15 @@ * limitations under the License. */ package com.gemstone.gemfire.internal.cache.wan.serial; +import java.util.Set; + import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; -import com.gemstone.gemfire.cache.wan.GatewaySender; import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; import com.gemstone.gemfire.distributed.DistributedLockService; import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.distributed.internal.ResourceEvent; import com.gemstone.gemfire.internal.cache.EntryEventImpl; @@ -34,15 +32,11 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.RegionQueue; import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; -import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender; import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; @@ -141,8 +135,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { if (ev != null && !ev.isStopped()) { ev.stopProcessing(); } - this.eventProcessor = null; - + // Stop the proxy (after the dispatcher, so the socket is still // alive until after the dispatcher has stopped) stompProxyDead(); @@ -152,11 +145,12 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { listener.close(); } logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this)); - + clearTempEventsAfterSenderStopped(); } finally { this.getLifeCycleLock().writeLock().unlock(); } + if (this.isPrimary()) { try { DistributedLockService @@ -165,11 +159,13 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { // service not found... ignore } } - if (getQueues() != null && !getQueues().isEmpty()) { - for (RegionQueue q : getQueues()) { + Set<RegionQueue> queues = getQueues(); + if (queues != null && !queues.isEmpty()) { + for (RegionQueue q : queues) { ((SerialGatewaySenderQueue)q).cleanUp(); } } + this.setIsPrimary(false); new UpdateAttributesProcessor(this).distribute(false); Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread(); @@ -190,6 +186,8 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { InternalDistributedSystem system = (InternalDistributedSystem) this.cache .getDistributedSystem(); system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this); + + this.eventProcessor = null; } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index 2f44fce..bfcb910 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -112,6 +112,8 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewa import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; +import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue; import com.gemstone.gemfire.pdx.SimpleClass; import com.gemstone.gemfire.pdx.SimpleClass1; import com.gemstone.gemfire.test.dunit.AsyncInvocation; @@ -2221,7 +2223,7 @@ public class WANTestBase extends DistributedTestCase{ exln.remove(); } } - + public static void stopSender(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -2235,7 +2237,21 @@ public class WANTestBase extends DistributedTestCase{ break; } } + AbstractGatewaySenderEventProcessor eventProcessor = null; + if (sender instanceof AbstractGatewaySender) { + eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor(); + } sender.stop(); + + Set<RegionQueue> queues = null; + if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) { + queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues(); + for (RegionQueue queue: queues) { + if (queue instanceof SerialGatewaySenderQueue) { + assertFalse(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive()); + } + } + } } finally { exp.remove(); @@ -2261,6 +2277,35 @@ public class WANTestBase extends DistributedTestCase{ } } + public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) { + + InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); + gateway.setParallel(isParallel); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setBatchConflationEnabled(isConflation); + gateway.setManualStart(isManualStart); + gateway.setDispatcherThreads(numDispatchers); + gateway.setOrderPolicy(policy); + ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + eventFilter = filter; + gateway.addGatewayEventFilter(filter); + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + return gateway; + } + public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -2272,55 +2317,9 @@ public class WANTestBase extends DistributedTestCase{ persistentDirectory.mkdir(); DiskStoreFactory dsf = cache.createDiskStoreFactory(); File[] dirs1 = new File[] { persistentDirectory }; - if (isParallel) { - InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - eventFilter = filter; - gateway.addGatewayEventFilter(filter); - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY); + gateway.create(dsName, remoteDsId); - } else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - eventFilter = filter; - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.create(dsName, remoteDsId); - } } finally { exln.remove(); } @@ -2329,66 +2328,20 @@ public class WANTestBase extends DistributedTestCase{ public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManulaStart, int numDispatchers, OrderPolicy orderPolicy) { + GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy orderPolicy) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - if (isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManulaStart); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - eventFilter = filter; - gateway.addGatewayEventFilter(filter); - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.setDispatcherThreads(numDispatchers); - gateway.setOrderPolicy(orderPolicy); - gateway.create(dsName, remoteDsId); - - } else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManulaStart); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - eventFilter = filter; - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setDispatcherThreads(numDispatchers); - gateway.setOrderPolicy(orderPolicy); - gateway.create(dsName, remoteDsId); - } - } finally { - exln.remove(); - } + try { + File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, + isManualStart, numDispatchers, orderPolicy); + gateway.create(dsName, remoteDsId); + + } finally { + exln.remove(); + } } public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, @@ -2417,53 +2370,8 @@ public class WANTestBase extends DistributedTestCase{ persistentDirectory.mkdir(); DiskStoreFactory dsf = cache.createDiskStoreFactory(); File[] dirs1 = new File[] { persistentDirectory }; - - if (isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.setDispatcherThreads(concurrencyLevel); - gateway.setOrderPolicy(policy); - gateway.create(dsName, remoteDsId); - } else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setDispatcherThreads(concurrencyLevel); - gateway.setOrderPolicy(policy); - gateway.create(dsName, remoteDsId); - } + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy); + gateway.create(dsName, remoteDsId); } // public static void createSender_PDX(String dsName, int remoteDsId, http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java index e24f593..172a4f2 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java @@ -28,5 +28,5 @@ public class ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest extends public boolean isOffHeap() { return true; } - + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java index ea6de11..00778e9 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -29,6 +29,7 @@ import com.gemstone.gemfire.distributed.Locator; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.distributed.internal.InternalLocator; import com.gemstone.gemfire.distributed.internal.ServerLocator; +import com.gemstone.gemfire.internal.cache.RegionQueue; import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; @@ -420,7 +421,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 )); validateQueueContents(vm5, "ln", 100); - validateQueueClosedVM4(); vm5.invoke(() -> WANTestBase.stopSender( "ln" )); vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); @@ -442,11 +442,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { getTestMethodName() + "_RR", 110 )); vm4.invoke(() -> WANTestBase.stopSender( "ln" )); } - - private void validateQueueClosedVM4() { - // TODO Auto-generated method stub - - } private void validateQueueContents(VM vm, String site, int size) { vm.invoke(() -> WANTestBase.validateQueueContents( site,
