This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 38139fb GEODE-8329: Fix for durable CQ reqistration recovery (#5360)
38139fb is described below
commit 38139fbb00cbea872348d3554f9589bd7c5bfdde
Author: Jakov Varenina <[email protected]>
AuthorDate: Mon Dec 7 12:35:49 2020 +0100
GEODE-8329: Fix for durable CQ reqistration recovery (#5360)
* GEODE-8329: Fix for durable CQ reqistration recovery
This change solves the issue when the client without configured HA is
wrongly re-registering durable CQs as non durable during the server
failover.
* Fix for stressTest
* empty commit to re-launch CI
---
.../cache/client/internal/QueueManagerImpl.java | 3 +-
.../tier/sockets/DurableClientCQDUnitTest.java | 139 +++++++++++++++++++++
.../cache/tier/sockets/DurableClientTestBase.java | 52 ++++++--
3 files changed, 183 insertions(+), 11 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 212d2de..145817c 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -1112,7 +1112,8 @@ public class QueueManagerImpl implements QueueManager {
.set(((DefaultQueryService)
this.pool.getQueryService()).getUserAttributes(name));
}
try {
- if (((CqStateImpl) cqi.getState()).getState() != CqStateImpl.INIT) {
+ if (((CqStateImpl) cqi.getState()).getState() != CqStateImpl.INIT
+ && cqi.isDurable() == isDurable) {
cqi.createOn(recoveredConnection, isDurable);
}
} finally {
diff --git
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
index 4b24be9..cc171eb 100644
---
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
+++
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -51,10 +52,13 @@ import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
import org.apache.geode.internal.cache.ClientServerObserverAdapter;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
@Category({ClientSubscriptionTest.class})
@@ -145,6 +149,107 @@ public class DurableClientCQDUnitTest extends
DurableClientTestBase {
}
/**
+ * Test that durable CQ is correctly re-registered to new server after the
failover and
+ * that the durable client functionality works as expected.
+ * Steps:
+ * 1. Start two servers
+ * 2. Start durable client without HA and register durable CQs
+ * 3. Shutdown the server that is hosting CQs subscription queue (primary
server)
+ * 4. Wait for the durable client to perform the failover to the another
server
+ * 5. Shutdown the durable client with keepAlive flag set to true
+ * 6. Provision remaining server with the data that should fulfil CQ
condition and fill the queue
+ * 7. Start the durable client again and check that it receives correct
events from queue
+ */
+ @Test
+ public void testDurableCQServerFailoverWithoutHAConfigured()
+ throws Exception {
+ String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p
where p.ID > 5";
+ String allQuery = "select * from " + SEPARATOR + regionName + " p where
p.ID > -1";
+ String lessThan5Query = "select * from " + SEPARATOR + regionName + " p
where p.ID < 5";
+
+ // Start a server 1
+ server1Port = this.server1VM
+ .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
+
+ // Start server 2
+ server2Port = this.server2VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, Boolean.TRUE});
+
+ // Start a durable client that is kept alive on the server when it stops
normally
+ durableClientId = getName() + "_client";
+ CacheServerTestUtil.createCacheClient(
+ getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, true, 0),
+ regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE);
+
+ // register non durable cq
+ createCq("GreaterThan5", greaterThan5Query, false).execute();
+
+ // register durable cqs
+ createCq("All", allQuery, true).execute();
+ createCq("LessThan5", lessThan5Query, true).execute();
+
+ // send client ready
+ CacheServerTestUtil.getClientCache().readyForEvents();
+
+ int oldPrimaryPort = getPrimaryServerPort();
+ // Close the server that is hosting subscription queue
+ VM primary = getPrimaryServerVM();
+ // Verify durable client on server
+
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
durableClientId,
+ primary);
+
+ primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
+
+ // Wait until failover to the another server is successfully performed
+ waitForFailoverToPerform(oldPrimaryPort);
+ primary = getPrimaryServerVM();
+ waitForDurableClientPresence(durableClientId, primary, 1);
+ int primaryPort = getPrimaryServerPort();
+
+ // Stop the durable client
+ CacheServerTestUtil.closeCache(true);
+
+ // Start normal publisher client
+ startClient(publisherClientVM, primaryPort, regionName);
+
+ // Publish some entries
+ publishEntries(regionName, 10);
+
+ // Restart the durable client
+ CacheServerTestUtil.createCacheClient(
+ getClientPool(NetworkUtils.getServerHostName(), primaryPort, true),
+ regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE);
+ assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
+
+ // Re-register non durable cq
+ createCq("GreaterThan5", greaterThan5Query, false).execute();
+
+ // Re-register durable cqs
+ createCq("All", allQuery, true).execute();
+ createCq("LessThan5", lessThan5Query, true).execute();
+
+ // send client ready
+ CacheServerTestUtil.getClientCache().readyForEvents();
+
+ // verify cq events for all 3 cqs
+ checkCqListenerEvents("GreaterThan5", 0 /* numEventsExpected */,
+ /* numEventsToWaitFor */ 15/* secondsToWait */);
+ checkCqListenerEvents("LessThan5", 5 /* numEventsExpected */,
+ /* numEventsToWaitFor */ 15/* secondsToWait */);
+ checkCqListenerEvents("All", 10 /* numEventsExpected */,
+ /* numEventsToWaitFor */ 15/* secondsToWait */);
+
+ primary = getPrimaryServerVM();
+ // Stop the durable client
+ CacheServerTestUtil.closeCache(false);
+ // Stop the publisher client
+ this.publisherClientVM.invoke((SerializableRunnableIF)
CacheServerTestUtil::closeCache);
+ // Stop the remaining server
+ primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
+ }
+
+
+ /**
* Test functionality to close the cq and drain all events from the ha queue
from the server This
* draining should not affect events that still have register interest
*/
@@ -782,6 +887,8 @@ public class DurableClientCQDUnitTest extends
DurableClientTestBase {
@Test
public void testGetAllDurableCqsFromServer() {
+
+
// Start server 1
server1Port = this.server1VM.invoke(CacheServerTestUtil.class,
"createCacheServer", new Object[] {regionName, Boolean.TRUE});
@@ -971,6 +1078,38 @@ public class DurableClientCQDUnitTest extends
DurableClientTestBase {
vm.invoke(cacheSerializableRunnable);
}
+ public VM getPrimaryServerVM() {
+ if (this.server1Port == getPrimaryServerPort()) {
+ return server1VM;
+ } else {
+ return server2VM;
+ }
+ }
+
+ public int getPrimaryServerPort() {
+ PoolImpl pool = CacheServerTestUtil.getPool();
+ ServerLocation primaryServerLocation = pool.getPrimary();
+ return primaryServerLocation.getPort();
+ }
+
+ public void waitForFailoverToPerform(int oldPrimaryPort) {
+ final PoolImpl pool = CacheServerTestUtil.getPool();
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getPrimary() != null && pool.getPrimary().getPort() !=
oldPrimaryPort;
+ }
+
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+
+ GeodeAwaitility.await().untilAsserted(ev);
+ assertNotNull(pool.getPrimary());
+ }
+
void registerDurableCq(final String cqName) {
// Durable client registers durable cq on server
this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
diff --git
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
index 796e58f..d01d5cb 100644
---
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
+++
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
@@ -60,10 +60,12 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
public class DurableClientTestBase extends JUnit4DistributedTestCase {
@@ -80,9 +82,9 @@ public class DurableClientTestBase extends
JUnit4DistributedTestCase {
VM publisherClientVM;
protected String regionName;
int server1Port;
+ int server2Port;
String durableClientId;
-
@Override
public final void postSetUp() throws Exception {
this.server1VM = VM.getVM(0);
@@ -172,6 +174,32 @@ public class DurableClientTestBase extends
JUnit4DistributedTestCase {
verifyDurableClientPresence(durableClientTimeout, durableClientId,
serverVM, 0);
}
+ void waitForDurableClientPresence(String durableClientId, VM serverVM, final
int count) {
+ serverVM.invoke(() -> {
+ if (count > 0) {
+
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ checkNumberOfClientProxies(count);
+ CacheClientProxy proxy = getClientProxy();
+
+ if (proxy != null && durableClientId.equals(proxy.getDurableId()))
{
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ GeodeAwaitility.await().untilAsserted(ev);
+ }
+ });
+ }
+
void verifyDurableClientPresence(int durableClientTimeout, String
durableClientId,
VM serverVM, final int count) {
serverVM.invoke(() -> {
@@ -372,7 +400,7 @@ public class DurableClientTestBase extends
JUnit4DistributedTestCase {
}
}
- private CqQuery createCq(String cqName, String cqQuery, boolean durable)
+ CqQuery createCq(String cqName, String cqQuery, boolean durable)
throws CqException, CqExistsException {
QueryService qs = CacheServerTestUtil.getCache().getQueryService();
CqAttributesFactory cqf = new CqAttributesFactory();
@@ -461,7 +489,6 @@ public class DurableClientTestBase extends
JUnit4DistributedTestCase {
return bridgeServer;
}
-
Pool getClientPool(String host, int server1Port, int server2Port,
boolean establishCallbackConnection, int redundancyLevel) {
PoolFactory pf = PoolManager.createFactory();
@@ -664,16 +691,21 @@ public class DurableClientTestBase extends
JUnit4DistributedTestCase {
void checkCqListenerEvents(VM vm, final String cqName, final int numEvents,
final int secondsToWait) {
vm.invoke(() -> {
- QueryService qs = CacheServerTestUtil.getCache().getQueryService();
- CqQuery cq = qs.getCq(cqName);
- // Get the listener and wait for the appropriate number of events
- CacheServerTestUtil.ControlCqListener listener =
- (CacheServerTestUtil.ControlCqListener)
cq.getCqAttributes().getCqListener();
- listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents);
- assertThat(numEvents).isEqualTo(listener.events.size());
+ checkCqListenerEvents(cqName, numEvents, secondsToWait);
});
}
+ void checkCqListenerEvents(final String cqName, final int numEvents,
+ final int secondsToWait) {
+ QueryService qs = CacheServerTestUtil.getCache().getQueryService();
+ CqQuery cq = qs.getCq(cqName);
+ // Get the listener and wait for the appropriate number of events
+ CacheServerTestUtil.ControlCqListener listener =
+ (CacheServerTestUtil.ControlCqListener)
cq.getCqAttributes().getCqListener();
+ listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents);
+ assertThat(numEvents).isEqualTo(listener.events.size());
+ }
+
void checkListenerEvents(int numberOfEntries, final int sleepMinutes, final
int eventType,
final VM vm) {
vm.invoke(() -> {