Repository: geode Updated Branches: refs/heads/develop 659654a0c -> cfd971338
GEODE-3181: Added wan backward compatibility unit tests Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/cfd97133 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/cfd97133 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/cfd97133 Branch: refs/heads/develop Commit: cfd971338811576ce99be9d347729a66047a1233 Parents: 659654a Author: Barry Oglesby <[email protected]> Authored: Mon Jul 10 17:59:08 2017 -0700 Committer: Barry Oglesby <[email protected]> Committed: Wed Jul 12 09:26:36 2017 -0700 ---------------------------------------------------------------------- .../cache/internal/JUnit4CacheTestCase.java | 2 +- .../cache/wan/WANRollingUpgradeDUnitTest.java | 596 ++++++++++++++++++- .../wan/parallel/BatchRemovalThreadHelper.java | 22 + 3 files changed, 605 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/cfd97133/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java index bf77be3..8a63126 100644 --- a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java +++ b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java @@ -315,7 +315,7 @@ public abstract class JUnit4CacheTestCase extends JUnit4DistributedTestCase if (cache != null) { IgnoredException.addIgnoredException("java.net.ConnectException"); } - return (InternalClientCache) cache; + return (ClientCache) cache; } } http://git-wip-us.apache.org/repos/asf/geode/blob/cfd97133/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java index b5776cd..58a5e18 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java @@ -14,9 +14,26 @@ */ package org.apache.geode.cache.wan; +import static org.junit.Assert.assertEquals; + +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.AvailablePort; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.parallel.BatchRemovalThreadHelper; +import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; @@ -28,6 +45,8 @@ import org.apache.geode.test.dunit.standalone.VersionManager; import org.apache.geode.test.junit.categories.BackwardCompatibilityTest; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; + +import org.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -37,6 +56,8 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; @Category({DistributedTest.class, BackwardCompatibilityTest.class}) @RunWith(Parameterized.class) @@ -64,10 +85,6 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase { @Test // This test verifies that a GatewaySenderProfile serializes properly between versions. public void testVerifyGatewaySenderProfile() throws Exception { - doTestVerifyGatewaySenderProfile(oldVersion); - } - - private void doTestVerifyGatewaySenderProfile(String oldVersion) throws Exception { final Host host = Host.getHost(0); VM oldLocator = host.getVM(oldVersion, 0); VM oldServer = host.getVM(oldVersion, 1); @@ -77,36 +94,472 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase { final int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); DistributedTestUtils.deleteLocatorStateFile(port); final String locators = NetworkUtils.getServerHostName(host) + "[" + port + "]"; - oldLocator.invoke(() -> startLocator(port, locators)); + oldLocator.invoke(() -> startLocator(port, 0, locators, "")); IgnoredException ie = IgnoredException.addIgnoredException("could not get remote locator information"); try { // Start old server - oldServer.invoke(() -> startServer(locators)); + oldServer.invoke(() -> createCache(locators)); // Create GatewaySender in old server - oldServer.invoke(() -> createGatewaySender()); + String senderId = getName() + "_gatewaysender"; + oldServer.invoke(() -> createGatewaySender(senderId, 10, + ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL)); // Start current server - currentServer.invoke(() -> startServer(locators)); + currentServer.invoke(() -> createCache(locators)); // Attempt to create GatewaySender in new server - currentServer.invoke(() -> createGatewaySender()); + currentServer.invoke(() -> createGatewaySender(senderId, 10, + ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL)); } finally { ie.remove(); } } - private void startLocator(int port, String locators) throws IOException { + @Test + public void testEventProcessingOldSiteOneCurrentSiteTwo() throws Exception { + final Host host = Host.getHost(0); + + // Get old site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(oldVersion, 2); + VM site1Client = host.getVM(oldVersion, 3); + + // Get current site members + VM site2Locator = host.getVM(4); + VM site2Server1 = host.getVM(5); + VM site2Server2 = host.getVM(6); + VM site2Client = host.getVM(7); + + // Get old site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get current site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start old site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Start current site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + // Start and configure old site servers + String regionName = getName() + "_region"; + String site1SenderId = getName() + "_gatewaysender_" + site2DistributedSystemId; + startAndConfigureServers(site1Server1, site1Server2, site1Locators, site2DistributedSystemId, + regionName, site1SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + + // Start and configure current site servers + String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; + startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, + regionName, site2SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + + // Do puts from old site client and verify events on current site + int numPuts = 100; + doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, site2Server1, site2Server2, + hostName, site1LocatorPort, regionName, numPuts, site1SenderId, false); + + // Do puts from current site client and verify events on old site + doClientPutsAndVerifyEvents(site2Client, site2Server1, site2Server2, site1Server1, site1Server2, + hostName, site2LocatorPort, regionName, numPuts, site2SenderId, false); + + // Do puts from old client in the current site and verify events on old site + site1Client.invoke(() -> closeCache()); + doClientPutsAndVerifyEvents(site1Client, site2Server1, site2Server2, site1Server1, site1Server2, + hostName, site2LocatorPort, regionName, numPuts, site2SenderId, false); + } + + @Test + public void testSecondaryEventsNotReprocessedAfterOldSiteMemberFailover() throws Exception { + final Host host = Host.getHost(0); + + // Get old site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(oldVersion, 2); + VM site1Client = host.getVM(oldVersion, 3); + + // Get current site members + VM site2Locator = host.getVM(4); + VM site2Server1 = host.getVM(5); + VM site2Server2 = host.getVM(6); + + // Get old site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get current site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start old site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Start current site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + try { + // Start and configure old site servers with secondary removals prevented + String regionName = getName() + "_region"; + String site1SenderId = getName() + "_gatewaysender_" + site2DistributedSystemId; + startAndConfigureServers(site1Server1, site1Server2, site1Locators, site2DistributedSystemId, + regionName, site1SenderId, Integer.MAX_VALUE); + + // Start and configure current site servers with secondary removals prevented + String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; + startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, + regionName, site2SenderId, Integer.MAX_VALUE); + + // Do puts from old site client and verify events on current site + int numPuts = 100; + doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, site2Server1, + site2Server2, hostName, site1LocatorPort, regionName, numPuts, site1SenderId, true); + + // Stop one sender in the old site and verify the other resends its events and that those + // events + // are ignored on the current site + stopSenderAndVerifyEvents(site1Server1, site1Server2, site2Server1, site2Server2, + site1SenderId, regionName, numPuts); + } finally { + resetAllMessageSyncIntervals(site1Server1, site1Server2, site2Server1, site2Server2); + } + } + + @Test + public void testSecondaryEventsNotReprocessedAfterCurrentSiteMemberFailover() throws Exception { + final Host host = Host.getHost(0); + + // Get old site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(oldVersion, 2); + + // Get current site members + VM site2Locator = host.getVM(4); + VM site2Server1 = host.getVM(5); + VM site2Server2 = host.getVM(6); + VM site2Client = host.getVM(7); + + // Get old site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get current site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start old site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Start current site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + try { + // Start and configure old site servers with secondary removals prevented + String regionName = getName() + "_region"; + String site1SenderId = getName() + "_gatewaysender_" + site2DistributedSystemId; + startAndConfigureServers(site1Server1, site1Server2, site1Locators, site2DistributedSystemId, + regionName, site1SenderId, Integer.MAX_VALUE); + + // Start and configure current site servers with secondary removals prevented + String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; + startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, + regionName, site2SenderId, Integer.MAX_VALUE); + + // Do puts from current site client and verify events on old site + int numPuts = 100; + doClientPutsAndVerifyEvents(site2Client, site2Server1, site2Server2, site1Server1, + site1Server2, hostName, site2LocatorPort, regionName, numPuts, site2SenderId, true); + + // Stop one sender in the current site and verify the other resends its events and that those + // events are ignored on the old site + stopSenderAndVerifyEvents(site2Server1, site2Server2, site1Server1, site1Server2, + site2SenderId, regionName, numPuts); + } finally { + resetAllMessageSyncIntervals(site1Server1, site1Server2, site2Server1, site2Server2); + } + } + + @Test + public void testSecondaryEventsNotReprocessedAfterCurrentSiteMemberFailoverWithOldClient() + throws Exception { + final Host host = Host.getHost(0); + + // Get old site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(oldVersion, 2); + VM site1Client = host.getVM(oldVersion, 3); + + // Get current site members + VM site2Locator = host.getVM(4); + VM site2Server1 = host.getVM(5); + VM site2Server2 = host.getVM(6); + + // Get old site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get current site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start old site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Start current site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + try { + // Start and configure old site servers with secondary removals prevented + String regionName = getName() + "_region"; + String site1SenderId = getName() + "_gatewaysender_" + site2DistributedSystemId; + startAndConfigureServers(site1Server1, site1Server2, site1Locators, site2DistributedSystemId, + regionName, site1SenderId, Integer.MAX_VALUE); + + // Start and configure current site servers with secondary removals prevented + String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; + startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, + regionName, site2SenderId, Integer.MAX_VALUE); + + // Do puts from old client in the current site and verify events on old site + int numPuts = 100; + doClientPutsAndVerifyEvents(site1Client, site2Server1, site2Server2, site1Server1, + site1Server2, hostName, site2LocatorPort, regionName, numPuts, site2SenderId, true); + + // Stop one sender in the current site and verify the other resends its events and that those + // events are ignored on the remote site + stopSenderAndVerifyEvents(site2Server1, site2Server2, site1Server1, site1Server2, + site2SenderId, regionName, numPuts); + } finally { + resetAllMessageSyncIntervals(site1Server1, site1Server2, site2Server1, site2Server2); + } + } + + @Test + public void testEventProcessingMixedSiteOneOldSiteTwo() throws Exception { + final Host host = Host.getHost(0); + + // Get mixed site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(2); + VM site1Client = host.getVM(oldVersion, 3); + + // Get old site members + VM site2Locator = host.getVM(oldVersion, 4); + VM site2Server1 = host.getVM(oldVersion, 5); + VM site2Server2 = host.getVM(oldVersion, 6); + + // Get mixed site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get old site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start mixed site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Start old site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + // Start and configure mixed site servers + String regionName = getName() + "_region"; + String site1SenderId = getName() + "_gatewaysender_" + site2DistributedSystemId; + startAndConfigureServers(site1Server1, site1Server2, site1Locators, site2DistributedSystemId, + regionName, site1SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + + // Start and configure old site servers + String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; + startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, + regionName, site2SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + + // Do puts from mixed site client and verify events on old site + int numPuts = 100; + doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, site2Server1, site2Server2, + hostName, site1LocatorPort, regionName, numPuts, site1SenderId, false); + } + + @Test + public void testEventProcessingMixedSiteOneCurrentSiteTwo() throws Exception { + final Host host = Host.getHost(0); + + // Get mixed site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(2); + VM site1Client = host.getVM(oldVersion, 3); + + // Get old site members + VM site2Locator = host.getVM(4); + VM site2Server1 = host.getVM(5); + VM site2Server2 = host.getVM(6); + + // Get mixed site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get old site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start mixed site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Start old site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + // Start and configure mixed site servers + String regionName = getName() + "_region"; + String site1SenderId = getName() + "_gatewaysender_" + site2DistributedSystemId; + startAndConfigureServers(site1Server1, site1Server2, site1Locators, site2DistributedSystemId, + regionName, site1SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + + // Start and configure old site servers + String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; + startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, + regionName, site2SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + + // Do puts from mixed site client and verify events on old site + int numPuts = 100; + doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, site2Server1, site2Server2, + hostName, site1LocatorPort, regionName, numPuts, site1SenderId, false); + } + + private void startLocator(int port, int distributedSystemId, String locators, + String remoteLocators) throws IOException { Properties props = new Properties(); props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, + String.valueOf(distributedSystemId)); props.setProperty(DistributionConfig.LOCATORS_NAME, locators); + props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocators); props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel); + props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); Locator.startLocatorAndDS(port, null, props); } - private void startServer(String locators) { + private void startAndConfigureServers(VM server1, VM server2, String locators, + int distributedSystem, String regionName, String senderId, int messageSyncInterval) { + // Start and configure servers + // - Create Cache + // - Create CacheServer + // - Create GatewaySender + // - Create GatewayReceiver + // - Create Region + + // Start and configure server 1 + server1.invoke(() -> createCache(locators)); + server1.invoke(() -> addCacheServer()); + server1.invoke(() -> createGatewaySender(senderId, distributedSystem, messageSyncInterval)); + server1.invoke(() -> createGatewayReceiver()); + server1.invoke(() -> createPartitionedRegion(regionName, senderId)); + + // Start and configure server 2 if necessary + if (server2 != null) { + server2.invoke(() -> createCache(locators)); + server2.invoke(() -> addCacheServer()); + server2.invoke(() -> createGatewaySender(senderId, distributedSystem, messageSyncInterval)); + server2.invoke(() -> createGatewayReceiver()); + server2.invoke(() -> createPartitionedRegion(regionName, senderId)); + } + } + + private void doClientPutsAndVerifyEvents(VM client, VM localServer1, VM localServer2, + VM remoteServer1, VM remoteServer2, String hostName, int locatorPort, String regionName, + int numPuts, String senderId, boolean primaryOnly) { + // Start client + client.invoke(() -> startClient(hostName, locatorPort, regionName)); + + // Do puts from client + client.invoke(() -> doPuts(regionName, numPuts)); + + // Wait for local site queues to be empty + localServer1.invoke(() -> waitForEmptyQueueRegion(senderId, primaryOnly)); + localServer2.invoke(() -> waitForEmptyQueueRegion(senderId, primaryOnly)); + + // Verify remote site received events + int remoteServer1EventsReceived = remoteServer1.invoke(() -> getEventsReceived(regionName)); + int remoteServer2EventsReceived = remoteServer2.invoke(() -> getEventsReceived(regionName)); + assertEquals(numPuts, remoteServer1EventsReceived + remoteServer2EventsReceived); + + // Clear events received in both sites + localServer1.invoke(() -> clearEventsReceived(regionName)); + localServer2.invoke(() -> clearEventsReceived(regionName)); + remoteServer1.invoke(() -> clearEventsReceived(regionName)); + remoteServer2.invoke(() -> clearEventsReceived(regionName)); + } + + private void stopSenderAndVerifyEvents(VM localServer1, VM localServer2, VM remoteServer1, + VM remoteServer2, String senderId, String regionName, int numPuts) { + // Verify the secondary events still exist + int localServer1QueueSize = localServer1.invoke(() -> getQueueRegionSize(senderId, false)); + int localServer2QueueSize = localServer2.invoke(() -> getQueueRegionSize(senderId, false)); + assertEquals(numPuts, localServer1QueueSize + localServer2QueueSize); + + // Stop one sender + localServer1.invoke(() -> closeCache()); + + // Wait for the other sender's queue to be empty + localServer2.invoke(() -> waitForEmptyQueueRegion(senderId, false)); + + // Verify remote site did not receive any events. The events received were previously cleared on + // all members, so there should be 0 events received on the remote site. + int remoteServer1EventsReceived = remoteServer1.invoke(() -> getEventsReceived(regionName)); + int remoteServer2EventsReceived = remoteServer2.invoke(() -> getEventsReceived(regionName)); + assertEquals(0, remoteServer1EventsReceived + remoteServer2EventsReceived); + } + + private void createCache(String locators) { Properties props = new Properties(); props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); props.setProperty(DistributionConfig.LOCATORS_NAME, locators); @@ -114,10 +567,125 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase { getCache(props); } - protected void createGatewaySender() throws Exception { + private void addCacheServer() throws Exception { + CacheServer server = getCache().addCacheServer(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + server.setPort(port); + server.start(); + } + + private void startClient(String hostName, int locatorPort, String regionName) { + ClientCacheFactory ccf = new ClientCacheFactory().addPoolLocator(hostName, locatorPort); + ClientCache cache = getClientCache(ccf); + cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + } + + private void createGatewaySender(String id, int remoteDistributedSystemId, + int messageSyncInterval) throws Exception { + // Setting the messageSyncInterval controls how often the BatchRemovalThread sends processed + // events from the primary to the secondary. Setting it high prevents the events from being + // removed from the secondary. + BatchRemovalThreadHelper.setMessageSyncInterval(messageSyncInterval); GatewaySenderFactory gsf = getCache().createGatewaySenderFactory(); gsf.setParallel(true); - gsf.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY); - gsf.create(getName() + "_gatewaysender", 10); + gsf.create(id, remoteDistributedSystemId); + } + + private void resetAllMessageSyncIntervals(VM site1Server1, VM site1Server2, VM site2Server1, + VM site2Server2) { + site1Server1.invoke(() -> resetMessageSyncInterval()); + site1Server2.invoke(() -> resetMessageSyncInterval()); + site2Server1.invoke(() -> resetMessageSyncInterval()); + site2Server2.invoke(() -> resetMessageSyncInterval()); + } + + private void resetMessageSyncInterval() { + BatchRemovalThreadHelper + .setMessageSyncInterval(ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); + } + + private void createGatewayReceiver() { + getCache().createGatewayReceiverFactory().create(); + } + + private void createPartitionedRegion(String regionName, String gatewaySenderId) { + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(1); + paf.setTotalNumBuckets(10); + getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT) + .addCacheListener(new EventCountCacheListener()).addGatewaySenderId(gatewaySenderId) + .setPartitionAttributes(paf.create()).create(regionName); + } + + private void doPuts(String regionName, int numPuts) { + Region region = getCache().getRegion(regionName); + for (int i = 0; i < numPuts; i++) { + region.put(i, i); + } + } + + private void waitForEmptyQueueRegion(String gatewaySenderId, boolean primaryOnly) + throws Exception { + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .until(() -> getQueueRegionSize(gatewaySenderId, primaryOnly) == 0); + } + + private int getQueueRegionSize(String gatewaySenderId, boolean primaryOnly) throws Exception { + // This method currently only supports parallel senders. It gets the size of the local data set + // from the + // underlying colocated region. Depending on the value of primaryOnly, it gets either the local + // primary data set (just primary buckets) or all local data set (primary and secondary + // buckets). + AbstractGatewaySender ags = + (AbstractGatewaySender) getCache().getGatewaySender(gatewaySenderId); + ConcurrentParallelGatewaySenderQueue prq = + (ConcurrentParallelGatewaySenderQueue) ags.getQueues().iterator().next(); + Region region = prq.getRegion(); + Region localDataSet = primaryOnly ? PartitionRegionHelper.getLocalPrimaryData(region) + : PartitionRegionHelper.getLocalData(region); + return localDataSet.size(); + } + + private Integer getEventsReceived(String regionName) { + Region region = getCache().getRegion(regionName); + EventCountCacheListener cl = + (EventCountCacheListener) region.getAttributes().getCacheListener(); + return cl.getEventsReceived(); + } + + private void clearEventsReceived(String regionName) { + Region region = getCache().getRegion(regionName); + EventCountCacheListener cl = + (EventCountCacheListener) region.getAttributes().getCacheListener(); + cl.clearEventsReceived(); + } + + private static class EventCountCacheListener extends CacheListenerAdapter { + + private AtomicInteger eventsReceived = new AtomicInteger(); + + public void afterCreate(EntryEvent event) { + process(event); + } + + public void afterUpdate(EntryEvent event) { + process(event); + } + + private void process(EntryEvent event) { + incrementEventsReceived(); + } + + private int incrementEventsReceived() { + return this.eventsReceived.incrementAndGet(); + } + + private int getEventsReceived() { + return this.eventsReceived.get(); + } + + private void clearEventsReceived() { + this.eventsReceived.set(0); + } } } http://git-wip-us.apache.org/repos/asf/geode/blob/cfd97133/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/BatchRemovalThreadHelper.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/BatchRemovalThreadHelper.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/BatchRemovalThreadHelper.java new file mode 100644 index 0000000..b730bcb --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/BatchRemovalThreadHelper.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +public class BatchRemovalThreadHelper { + + public static void setMessageSyncInterval(int intervalMs) { + ParallelGatewaySenderQueue.messageSyncInterval = intervalMs; + } +}
