Repository: incubator-geode Updated Branches: refs/heads/develop 0e3a60b7d -> 7e5f16d68
GEODE-1035 CI failure: DeltaPropagationWithCQDUnitTest.testFullValueRequestsWithCqWithoutRI GEODE-1035 #close The test had verification-step problems Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7e5f16d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7e5f16d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7e5f16d6 Branch: refs/heads/develop Commit: 7e5f16d688c7be5b8a85657fed1ec041c8ffb535 Parents: 0e3a60b Author: Bruce Schuchardt <[email protected]> Authored: Mon Mar 28 15:13:06 2016 -0700 Committer: Bruce Schuchardt <[email protected]> Committed: Mon Mar 28 15:13:06 2016 -0700 ---------------------------------------------------------------------- .../DeltaPropagationWithCQDUnitTest.java | 63 ++++++++++++++++---- 1 file changed, 50 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e5f16d6/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java index 6e5521a..1ea7970 100644 --- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java +++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java @@ -16,7 +16,11 @@ */ package com.gemstone.gemfire.internal.cache.tier.sockets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; import com.gemstone.gemfire.DeltaTestImpl; import com.gemstone.gemfire.cache.AttributesFactory; @@ -87,6 +91,8 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase { private static long cqEvents = 0; private static long cqErrors = 0; + + private static long deltasFound = 0; /** * @param name @@ -181,8 +187,10 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase { client1.invoke(() -> DeltaPropagationWithCQDUnitTest.doPuts(numOfKeys, true)); // verify client2's CQ listeners see above puts verifyCqListeners(numOfListeners * numOfKeys * numOfCQs * 2); + // verify number of deltas encountered in this client + assertEquals(numOfKeys, deltasFound); // verify full value requests at server - server1.invoke(() -> DeltaPropagationWithCQDUnitTest.verifyFullValueRequestsFromClients(10L)); + server1.invoke(() -> DeltaPropagationWithCQDUnitTest.verifyFullValueRequestsFromClients(numOfKeys*1l)); } public static void verifyCqListeners(final Integer events) throws Exception { @@ -193,6 +201,7 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase { } public boolean done() { + System.out.println("verifyCqListeners: expected total="+events+"; cqEvents="+cqEvents+"; cqErrors="+cqErrors); return (cqEvents + cqErrors) == events; } }; @@ -201,19 +210,23 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase { public static void verifyFullValueRequestsFromClients(Long expected) throws Exception { - Object[] proxies = ((CacheServerImpl)((GemFireCacheImpl)cache) - .getCacheServers().get(0)).getAcceptor().getCacheClientNotifier() - .getClientProxies().toArray(); - long fullValueRequests = ((CacheClientProxy)proxies[0]).getStatistics() - .getDeltaFullMessagesSent(); - if (fullValueRequests == 0) { - assertEquals("Full value requests, ", expected.longValue(), - ((CacheClientProxy)proxies[1]).getStatistics() - .getDeltaFullMessagesSent()); - } else { - assertEquals("Full value requests, ", expected.longValue(), - fullValueRequests); + List<CacheServerImpl> servers = ((GemFireCacheImpl)cache).getCacheServers(); + assertEquals("expected one server but found these: " + servers, 1, servers.size()); + + CacheClientProxy[] proxies = servers.get(0).getAcceptor().getCacheClientNotifier() + .getClientProxies().toArray(new CacheClientProxy[0]); + + // find the proxy for the client that processed the CQs - it will have + // incremented its deltaFullMessagesSent statistic when the listener invoked + // getValue() on the event and caused a RequestEventValue command to be + // invoked on the server + long fullValueRequests = 0; + for (int i=0; (i < proxies.length) && (fullValueRequests <= 0l); i++) { + CacheClientProxy proxy = proxies[i]; + fullValueRequests = proxy.getStatistics().getDeltaFullMessagesSent(); } + + assertEquals("Full value requests, ", expected.longValue(), fullValueRequests); } public static void doPut(Object key, Object value) throws Exception { @@ -314,13 +327,37 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase { for (int i = 0; i < numOfListeners; i++) { cqListeners[i] = new CqListenerAdapter() { public void onEvent(CqEvent event) { + System.out.println("CqListener.onEvent invoked. Event="+event); + if (event.getDeltaValue() != null) { + deltasFound++; + } + // The first CQ event dispatched with a delta will not have a newValue. + // Attempting to access the newValue will cause an exception to be + // thrown, exiting this listener and causing the full value to be + // read from the server. The listener is then invoked a second time + // and getNewValue will succeed event.getNewValue(); + if (event.getDeltaValue() != null) { + // if there's a newValue we should ignore the delta bytes + deltasFound--; + } + System.out.println("deltasFound="+deltasFound); cqEvents++; + System.out.println("cqEvents is now " + cqEvents); } public void onError(CqEvent event) { + System.out.println("CqListener.onError invoked. Event="+event); + if (event.getDeltaValue() != null) { + deltasFound++; + } event.getNewValue(); + if (event.getDeltaValue() != null) { + deltasFound--; + } + System.out.println("deltasFound="+deltasFound); cqErrors++; + System.out.println("cqErrors is now " + cqErrors); } }; caf.addCqListener(cqListeners[i]);
