This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8ddd865ee40958642a5f53039a45afe2f587c924 Author: Jinmei Liao <[email protected]> AuthorDate: Tue Nov 10 18:30:04 2020 -0800 GEODE-7675: Partitioned Region clear should be successful when clients are present with subscription enabled (#5727) --- .../internal/cache/PartitionRegionClearHATest.java | 236 +++++++++++++++++++++ .../test/concurrent/FileBasedCountDownLatch.java | 2 +- 2 files changed, 237 insertions(+), 1 deletion(-) diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java new file mode 100644 index 0000000..7d0db0a --- /dev/null +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.query.CqAttributesFactory; +import org.apache.geode.cache.query.CqEvent; +import org.apache.geode.cache.query.CqListener; +import org.apache.geode.cache.query.CqQuery; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.concurrent.FileBasedCountDownLatch; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionRegionClearHATest implements Serializable { + public static final String NAME = "testRegion"; + protected static MemberVM locator; + protected static MemberVM server1, server2; + protected static ClientVM client1, client2; + protected static List<MemberVM> servers; + + @ClassRule + public static ClusterStartupRule cluster = new ClusterStartupRule(5); + + private static AtomicBoolean clearEventReceived = new AtomicBoolean(false); + private FileBasedCountDownLatch latch; + + @BeforeClass + // setup once for all the tests: start up one locator, two servers and two clients. + // create the regions on all servers and clients. client1 has registered interests. client2 + // has CQ + public static void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + int locatorPort = locator.getPort(); + server1 = cluster.startServerVM(1, + s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME)); + server2 = cluster.startServerVM(2, + s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME)); + servers = Arrays.asList(server1, server2); + + client1 = cluster.startClientVM(3, cc -> { + cc.withLocatorConnection(locatorPort).withPoolSubscription(true); + }); + + client2 = cluster.startClientVM(4, cc -> { + cc.withLocatorConnection(locatorPort).withPoolSubscription(true); + }); + + // set client1 with a registered interest + client1.invoke(() -> { + Region<Object, Object> testRegion = ClusterStartupRule.getClientCache() + .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(NAME); + testRegion.registerInterestForAllKeys(); + }); + + // set client2 with a CQ + client2.invoke(() -> { + ClusterStartupRule.getClientCache() + .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(NAME); + + QueryService queryService = + ClusterStartupRule.getClientCache().getDefaultPool().getQueryService(); + CqAttributesFactory cqaFactory = new CqAttributesFactory(); + cqaFactory.addCqListener(new CqListener() { + @Override + public void onEvent(CqEvent aCqEvent) { + Operation baseOperation = aCqEvent.getBaseOperation(); + if (baseOperation.isClear()) { + clearEventReceived.set(true); + } + } + + @Override + public void onError(CqEvent aCqEvent) {} + }); + + CqQuery cqQuery = + queryService.newCq("select * from /" + NAME, cqaFactory.create()); + cqQuery.execute(); + }); + } + + @Before + // before each test, initialize and set the DistributionMessageObserver and input some data into + // the region + public void before() throws Exception { + latch = new FileBasedCountDownLatch(2); + MemberVM.invokeInEveryMember(() -> { + PauseDuringClearDistributionMessageObserver observer = + new PauseDuringClearDistributionMessageObserver(); + observer.setLatch(latch); + DistributionMessageObserver.setInstance(observer); + }, server1, server2); + + client1.invoke(() -> { + Region<Object, Object> testRegion = getClientRegion(); + for (int i = 0; i < 10; i++) { + testRegion.put(i, "value" + i); + } + }); + } + + @After + // after each test, reset the boolean variable and clear the DistributionMessageObserver + public void after() throws Exception { + clearEventReceived.set(false); + MemberVM.invokeInEveryMember(() -> { + DistributionMessageObserver.setInstance(null); + }, server1, server2); + } + + @Test + public void restartServerThatIssuesClear() throws Exception { + // sets the latch and issue the clear + server1.invokeAsync(() -> { + getRegion().clear(); + }); + + // only restart the server1 until we have reached the observer code + await().until(() -> latch.currentValue() == 1); + + // restart server1 + server1.stop(false); + int locatorPort = locator.getPort(); + server1 = cluster.startServerVM(1, + s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME)); + + latch.countDown(); + verifyCleared(); + } + + @Test + public void restartServerThatDoesNotIssueClear() throws Exception { + server1.invokeAsync(() -> { + getRegion().clear(); + }); + + // only restart the server2 until we have reached the observer code + await().until(() -> latch.currentValue() == 1); + + // restart server2 + server2.stop(false); + int locatorPort = locator.getPort(); + server2 = cluster.startServerVM(2, + s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME)); + + latch.countDown(); + verifyCleared(); + } + + private void verifyCleared() { + client1.invoke(() -> { + Region<Object, Object> testRegion = getClientRegion(); + await().untilAsserted(() -> assertThat(testRegion.size()).isEqualTo(0)); + }); + + client2.invoke(() -> { + assertThat(clearEventReceived.get()).isTrue(); + assertThat(getClientRegion().size()).isEqualTo(0); + }); + + MemberVM.invokeInEveryMember(() -> { + assertThat(getRegion().size()).isEqualTo(0); + }, server1, server2); + } + + private static Region<Object, Object> getClientRegion() { + return ClusterStartupRule.getClientCache().getRegion("/" + NAME); + } + + private static Region<Object, Object> getRegion() { + return ClusterStartupRule.getCache().getRegion("/" + NAME); + } + + private static class PauseDuringClearDistributionMessageObserver + extends DistributionMessageObserver { + private FileBasedCountDownLatch latch; + + public void setLatch(FileBasedCountDownLatch latch) { + this.latch = latch; + } + + @Override + public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + if (message instanceof PartitionedRegionClearMessage) { + PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message; + if (clearMessage + .getOp() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) { + try { + // count down to 1 so that we can go ahead and restart the server + latch.countDown(); + // go ahead until the count is 0 (someone else must call countdown one more time + latch.await(); + } catch (Exception ex) { + } + } + } + } + } + +} diff --git a/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java b/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java index 0f77d4d..b928d62 100644 --- a/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java +++ b/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java @@ -73,7 +73,7 @@ public class FileBasedCountDownLatch implements Serializable { GeodeAwaitility.await().until(this::currentValue, is(equalTo(0))); } - protected int currentValue() throws IOException { + public int currentValue() throws IOException { try (FileOutputStream out = new FileOutputStream(lockFile)) { java.nio.channels.FileLock lock = out.getChannel().lock(); try {
