This is an automated email from the ASF dual-hosted git repository. agingade pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push: new e960021 GEODE-7678 (2nd PR) - Support for cache-listener and client-notification for Partitioned Region Clear operation (#5124) e960021 is described below commit e9600213fef5d6adb4fcef4e3ed655553fd0cc21 Author: agingade <aging...@pivotal.io> AuthorDate: Wed May 20 16:08:07 2020 -0700 GEODE-7678 (2nd PR) - Support for cache-listener and client-notification for Partitioned Region Clear operation (#5124) * GEODE-7678: Add support for cache listener and client notification for PR clear The changes are made to PR clear messaging and locking mechanism to preserve cache-listener and client-events ordering during concurrent cache operation while clear in progress. --- .../integrationTest/resources/assembly_content.txt | 1 + .../cache/PRCacheListenerDistributedTest.java | 250 +++++++++++- .../ReplicateCacheListenerDistributedTest.java | 111 +++++- ...ionedRegionAfterClearNotificationDUnitTest.java | 372 ++++++++++++++++++ .../cache/PartitionedRegionClearDUnitTest.java | 1 - ...titionedRegionClearWithExpirationDUnitTest.java | 69 ++-- ...itionedRegionClearWithExpirationDUnitTest.java} | 58 +-- .../cache/PartitionedRegionIntegrationTest.java | 45 +++ .../codeAnalysis/sanctionedDataSerializables.txt | 8 + .../PartitionedRegionPartialClearException.java | 37 ++ .../main/java/org/apache/geode/cache/Region.java | 4 +- .../org/apache/geode/internal/DSFIDFactory.java | 5 + .../apache/geode/internal/cache/BucketAdvisor.java | 2 +- .../apache/geode/internal/cache/BucketRegion.java | 17 +- .../internal/cache/DistributedClearOperation.java | 10 +- .../geode/internal/cache/DistributedRegion.java | 9 +- .../geode/internal/cache/InternalRegion.java | 3 + .../apache/geode/internal/cache/LocalRegion.java | 3 +- .../geode/internal/cache/PartitionedRegion.java | 217 ++--------- .../internal/cache/PartitionedRegionClear.java | 419 +++++++++++++++++++++ .../cache/PartitionedRegionClearMessage.java | 287 ++++++++++++++ .../internal/cache/PartitionedRegionDataStore.java | 8 + .../internal/cache/partitioned/RegionAdvisor.java | 11 + .../sanctioned-geode-core-serializables.txt | 2 + .../internal/cache/BucketRegionJUnitTest.java | 4 +- .../internal/cache/PartitionedRegionTest.java | 39 -- .../serialization/DataSerializableFixedID.java | 2 + 27 files changed, 1679 insertions(+), 315 deletions(-) diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt index 5beafe5..ae5243c 100644 --- a/geode-assembly/src/integrationTest/resources/assembly_content.txt +++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt @@ -221,6 +221,7 @@ javadoc/org/apache/geode/cache/PartitionAttributes.html javadoc/org/apache/geode/cache/PartitionAttributesFactory.html javadoc/org/apache/geode/cache/PartitionResolver.html javadoc/org/apache/geode/cache/PartitionedRegionDistributionException.html +javadoc/org/apache/geode/cache/PartitionedRegionPartialClearException.html javadoc/org/apache/geode/cache/PartitionedRegionStorageException.html javadoc/org/apache/geode/cache/Region.Entry.html javadoc/org/apache/geode/cache/Region.html diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java index 559def7..f4a9ac9 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java @@ -14,14 +14,21 @@ */ package org.apache.geode.cache; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.dunit.VM.getVMCount; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Arrays; +import java.util.Collection; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.UseParametersRunnerFactory; +import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; /** @@ -38,28 +45,60 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor @SuppressWarnings("serial") public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest { - @Parameters(name = "{index}: redundancy={0}") - public static Iterable<Integer> data() { - return Arrays.asList(0, 3); + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {1, Boolean.FALSE}, + {3, Boolean.TRUE}, + }); } @Parameter public int redundancy; + @Parameter(1) + public Boolean withData; + @Override protected Region<String, Integer> createRegion(final String name, final CacheListener<String, Integer> listener) { + return createPartitionedRegion(name, listener, false); + } + + protected Region<String, Integer> createAccessorRegion(final String name, + final CacheListener<String, Integer> listener) { + return createPartitionedRegion(name, listener, true); + } + + private Region<String, Integer> createPartitionedRegion(String name, + CacheListener<String, Integer> listener, boolean accessor) { + LogService.getLogger() + .info("Params [Redundancy: " + redundancy + " withData:" + withData + "]"); PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundancy); + if (accessor) { + paf.setLocalMaxMemory(0); + } RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory(); - regionFactory.addCacheListener(listener); + if (listener != null) { + regionFactory.addCacheListener(listener); + } regionFactory.setDataPolicy(DataPolicy.PARTITION); regionFactory.setPartitionAttributes(paf.create()); return regionFactory.create(name); } + private void withData(Region region) { + if (withData) { + // Fewer buckets. + // Covers case where node doesn't have any buckets depending on redundancy. + region.put("key1", "value1"); + region.put("key2", "value2"); + } + } + @Override protected int expectedCreates() { return 1; @@ -79,4 +118,207 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri protected int expectedDestroys() { return 1; } + + @Test + public void afterRegionDestroyIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, listener)); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys()); + } + + @Test + public void afterRegionDestroyIsInvokedOnNodeWithListener() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1); + } + + @Test + public void afterRegionDestroyIsInvokedOnRemoteNodeWithListener() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, null); + + getVM(0).invoke(() -> { + createRegion(regionName, listener); + }); + + for (int i = 1; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1); + } + + @Test + public void afterRegionDestroyIsInvokedOnAccessorAndDataMembers() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createAccessorRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, listener)); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)) + .isGreaterThanOrEqualTo(expectedRegionDestroys()); + } + + @Test + public void afterRegionDestroyIsInvokedOnAccessor() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createAccessorRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1); + } + + @Test + public void afterRegionDestroyIsInvokedOnNonAccessor() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createAccessorRegion(regionName, null); + getVM(0).invoke(() -> { + createRegion(regionName, listener); + }); + for (int i = 1; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1); + } + + @Test + public void afterRegionClearIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, listener)); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears()); + } + + @Test + public void afterClearIsInvokedOnNodeWithListener() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1); + } + + @Test + public void afterRegionClearIsInvokedOnRemoteNodeWithListener() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, null); + getVM(0).invoke(() -> { + createRegion(regionName, listener); + }); + for (int i = 1; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1); + } + + @Test + public void afterRegionClearIsInvokedOnAccessorAndDataMembers() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createAccessorRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, listener)); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears()); + } + + @Test + public void afterRegionClearIsInvokedOnAccessor() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createAccessorRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1); + } + + @Test + public void afterRegionClearIsInvokedOnNonAccessor() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createAccessorRegion(regionName, null); + + getVM(0).invoke(() -> { + createRegion(regionName, listener); + }); + for (int i = 1; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, null)); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1); + } + } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java index b0e1894..227c813 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java @@ -51,13 +51,15 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { private static final String UPDATES = "UPDATES"; private static final String INVALIDATES = "INVALIDATES"; private static final String DESTROYS = "DESTROYS"; + protected static final String CLEAR = "CLEAR"; + protected static final String REGION_DESTROY = "REGION_DESTROY"; private static final int ENTRY_VALUE = 0; private static final int UPDATED_ENTRY_VALUE = 1; private static final String KEY = "key-1"; - private String regionName; + protected String regionName; @Rule public DistributedRule distributedRule = new DistributedRule(); @@ -82,6 +84,8 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { sharedCountersRule.initialize(DESTROYS); sharedCountersRule.initialize(INVALIDATES); sharedCountersRule.initialize(UPDATES); + sharedCountersRule.initialize(CLEAR); + sharedCountersRule.initialize(REGION_DESTROY); } @Test @@ -148,6 +152,36 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { assertThat(sharedCountersRule.getTotal(DESTROYS)).isEqualTo(expectedDestroys()); } + @Test + public void afterClearIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears()); + } + + @Test + public void afterRegionDestroyIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.destroyRegion(); + + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys()); + } + protected Region<String, Integer> createRegion(final String name, final CacheListener<String, Integer> listener) { RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory(); @@ -174,6 +208,14 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { return getVMCount() + 1; } + protected int expectedClears() { + return getVMCount() + 1; + } + + protected int expectedRegionDestroys() { + return getVMCount() + 1; + } + /** * Overridden within tests to increment shared counters. */ @@ -283,7 +325,12 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { @Override public void afterCreate(final EntryEvent<String, Integer> event) { - // ignore + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); } @Override @@ -302,4 +349,64 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { errorCollector.checkThat(event.getNewValue(), nullValue()); } } + + protected class ClearCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterRegionClear(RegionEvent<String, Integer> event) { + + sharedCountersRule.increment(CLEAR); + if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) { + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR)); + errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName)); + } + } + + protected class RegionDestroyCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterRegionDestroy(final RegionEvent<String, Integer> event) { + sharedCountersRule.increment(REGION_DESTROY); + + if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) { + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_DESTROY)); + errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName)); + } + } } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java new file mode 100644 index 0000000..237b6a8 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1; + protected MemberVM dataStore2; + protected MemberVM dataStore3; + protected MemberVM accessor; + + protected ClientVM client1; + protected ClientVM client2; + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void invokeClearOnDataStoreAndVerifyListenerCount() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void invokeClearOnAccessorAndVerifyListenerCount() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void invokeClearFromClientAndVerifyListenerCount() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void invokeClearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void invokeClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearBeforeMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS); + + dataStore1.invoke(() -> getCache().close()); + getBlackboard().signalGate("CACHE_CLOSED"); + + // This should not be blocked. + dataStore2.invoke(() -> feed(false)); + dataStore3.invoke(() -> feed(false)); + + dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + + ds1ClearAsync.await(); + } + + @Test + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembersAfterMessageProcessed() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearAfterMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS); + + dataStore1.invoke(() -> getCache().close()); + getBlackboard().signalGate("CACHE_CLOSED"); + + // This should not be blocked. + dataStore2.invoke(() -> feed(false)); + dataStore3.invoke(() -> feed(false)); + + dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + + ds1ClearAsync.await(); + } + + + private static class CountingCacheListener extends CacheListenerAdapter { + private final AtomicInteger clears = new AtomicInteger(); + + @Override + public void afterRegionClear(RegionEvent event) { + clears.incrementAndGet(); + } + + int getClears() { + return clears.get(); + + } + } + + private DistributionMessageObserver testHookToKillMemberCallingClearBeforeMessageProcessed() { + return new DistributionMessageObserver() { + + @Override + public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + if (message instanceof PartitionedRegionClearMessage) { + if (((PartitionedRegionClearMessage) message) + .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { + DistributionMessageObserver.setInstance(null); + getBlackboard().signalGate("CLOSE_CACHE"); + try { + getBlackboard().waitForGate("CACHE_CLOSED", 30, SECONDS); + GeodeAwaitility.await().untilAsserted( + () -> assertThat(dm.isCurrentMember(message.getSender())).isFalse()); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed waiting for signal."); + } + } + } + } + }; + } + + private DistributionMessageObserver testHookToKillMemberCallingClearAfterMessageProcessed() { + return new DistributionMessageObserver() { + @Override + public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + if (message instanceof PartitionedRegionClearMessage) { + if (((PartitionedRegionClearMessage) message) + .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { + DistributionMessageObserver.setInstance(null); + getBlackboard().signalGate("CLOSE_CACHE"); + try { + getBlackboard().waitForGate("CACHE_CLOSED", 30, SECONDS); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed waiting for signal."); + } + } + } + } + }; + } + + private static DUnitBlackboard getBlackboard() { + if (blackboard == null) { + blackboard = new DUnitBlackboard(); + } + return blackboard; + } + +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java index e2e04eb..a3b311c 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java @@ -80,7 +80,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable { protected Properties getProperties() { Properties properties = new Properties(); - properties.setProperty("log-level", "info"); return properties; } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java index 7f3dff9..dfc9470 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java @@ -17,12 +17,8 @@ package org.apache.geode.internal.cache; import static org.apache.geode.cache.ExpirationAction.DESTROY; import static org.apache.geode.cache.RegionShortcut.PARTITION; import static org.apache.geode.cache.RegionShortcut.PARTITION_OVERFLOW; -import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; -import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT_OVERFLOW; import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; -import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; -import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW; import static org.apache.geode.internal.util.ArrayUtils.asList; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.dunit.VM.getVM; @@ -53,6 +49,7 @@ import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.ExpirationAttributes; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionEvent; import org.apache.geode.cache.RegionShortcut; @@ -75,7 +72,8 @@ import org.apache.geode.test.dunit.rules.DistributedRule; @RunWith(JUnitParamsRunner.class) public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable { private static final Integer BUCKETS = 13; - private static final Integer EXPIRATION_TIME = 30; + private static final Integer EXPIRATION_TIME = 5 * 60; + private static final Integer SMALL_EXPIRATION_TIME = 10; private static final String REGION_NAME = "PartitionedRegion"; @Rule @@ -106,11 +104,6 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab PARTITION_OVERFLOW, PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, - - PARTITION_PERSISTENT, - PARTITION_PERSISTENT_OVERFLOW, - PARTITION_REDUNDANT_PERSISTENT, - PARTITION_REDUNDANT_PERSISTENT_OVERFLOW }; } @@ -134,26 +127,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab accessor = getVM(TestVM.ACCESSOR.vmNumber); } - private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) { - if (dataStoreRegionShortcut.isPersistent()) { - switch (dataStoreRegionShortcut) { - case PARTITION_PERSISTENT: - return PARTITION; - case PARTITION_PERSISTENT_OVERFLOW: - return PARTITION_OVERFLOW; - case PARTITION_REDUNDANT_PERSISTENT: - return PARTITION_REDUNDANT; - case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: - return PARTITION_REDUNDANT_OVERFLOW; - } - } - - return dataStoreRegionShortcut; - } - private void initAccessor(RegionShortcut regionShortcut, ExpirationAttributes expirationAttributes) { - RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut); PartitionAttributes<String, String> attributes = new PartitionAttributesFactory<String, String>() .setTotalNumBuckets(BUCKETS) @@ -161,7 +136,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab .create(); cacheRule.getCache() - .<String, String>createRegionFactory(accessorShortcut) + .<String, String>createRegionFactory(regionShortcut) .setPartitionAttributes(attributes) .setEntryTimeToLive(expirationAttributes) .setEntryIdleTimeout(expirationAttributes) @@ -281,6 +256,19 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab })); } + private void doClear() { + Cache cache = cacheRule.getCache(); + boolean retry; + do { + retry = false; + try { + cache.getRegion(REGION_NAME).clear(); + } catch (PartitionedRegionPartialClearException | CacheWriterException ex) { + retry = true; + } + } while (retry); + } + /** * The test does the following (clear coordinator and region type are parametrized): * - Populates the Partition Region (entries have expiration). @@ -303,10 +291,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab populateRegion(accessor, entries, asList(accessor, server1, server2)); // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { - Cache cache = cacheRule.getCache(); - cache.getRegion(REGION_NAME).clear(); - }); + getVM(coordinatorVM.vmNumber).invoke(() -> doClear()); // Assert all expiration tasks were cancelled and none were executed. asList(server1, server2).forEach(vm -> vm.invoke(() -> { @@ -323,7 +308,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab // Assert Region Buckets are consistent and region is empty, accessor.invoke(this::assertRegionBucketsConsistency); - assertRegionIsEmpty(asList(accessor, server1, server1)); + assertRegionIsEmpty(asList(accessor, server1, server2)); } /** @@ -344,7 +329,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive( RegionShortcut regionShortcut) { final int entries = 1000; - ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY); + ExpirationAttributes expirationAttributes = + new ExpirationAttributes(SMALL_EXPIRATION_TIME, DESTROY); parametrizedSetup(regionShortcut, expirationAttributes); populateRegion(accessor, entries, asList(accessor, server1, server2)); registerVMKillerAsCacheWriter(Collections.singletonList(server1)); @@ -408,22 +394,21 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})") public void clearShouldSucceedAndRemoveRegisteredExpirationTasksWhenNonCoordinatorMemberIsBounced( TestVM coordinatorVM, RegionShortcut regionShortcut) { - final int entries = 1500; + final int entries = 500; + + RegionShortcut rs = regionShortcut; ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY); parametrizedSetup(regionShortcut, expirationAttributes); registerVMKillerAsCacheWriter(Collections.singletonList(server2)); populateRegion(accessor, entries, asList(accessor, server1, server2)); // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { - Cache cache = cacheRule.getCache(); - cache.getRegion(REGION_NAME).clear(); - }); + getVM(coordinatorVM.vmNumber).invoke(() -> doClear()); // Wait for member to get back online and assign buckets. server2.invoke(() -> { cacheRule.createCache(); - initDataStore(regionShortcut, expirationAttributes); + initDataStore(rs, expirationAttributes); await().untilAsserted( () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME)); @@ -460,7 +445,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab // Assert Region Buckets are consistent and region is empty, accessor.invoke(this::assertRegionBucketsConsistency); - assertRegionIsEmpty(asList(accessor, server1, server1)); + assertRegionIsEmpty(asList(accessor, server1, server2)); } /** diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java similarity index 93% copy from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java copy to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java index 7f3dff9..f6f25bd 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java @@ -53,6 +53,7 @@ import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.ExpirationAttributes; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionEvent; import org.apache.geode.cache.RegionShortcut; @@ -73,9 +74,10 @@ import org.apache.geode.test.dunit.rules.DistributedRule; * on the {@link PartitionedRegion} once the operation is executed. */ @RunWith(JUnitParamsRunner.class) -public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable { +public class PersistentPartitionedRegionClearWithExpirationDUnitTest implements Serializable { private static final Integer BUCKETS = 13; - private static final Integer EXPIRATION_TIME = 30; + private static final Integer EXPIRATION_TIME = 5 * 60; + private static final Integer SMALL_EXPIRATION_TIME = 10; private static final String REGION_NAME = "PartitionedRegion"; @Rule @@ -102,11 +104,6 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab @SuppressWarnings("unused") static RegionShortcut[] regionTypes() { return new RegionShortcut[] { - PARTITION, - PARTITION_OVERFLOW, - PARTITION_REDUNDANT, - PARTITION_REDUNDANT_OVERFLOW, - PARTITION_PERSISTENT, PARTITION_PERSISTENT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT, @@ -281,6 +278,19 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab })); } + private void doClear() { + Cache cache = cacheRule.getCache(); + boolean retry; + do { + retry = false; + try { + cache.getRegion(REGION_NAME).clear(); + } catch (PartitionedRegionPartialClearException | CacheWriterException ex) { + retry = true; + } + } while (retry); + } + /** * The test does the following (clear coordinator and region type are parametrized): * - Populates the Partition Region (entries have expiration). @@ -303,10 +313,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab populateRegion(accessor, entries, asList(accessor, server1, server2)); // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { - Cache cache = cacheRule.getCache(); - cache.getRegion(REGION_NAME).clear(); - }); + getVM(coordinatorVM.vmNumber).invoke(() -> doClear()); // Assert all expiration tasks were cancelled and none were executed. asList(server1, server2).forEach(vm -> vm.invoke(() -> { @@ -323,7 +330,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab // Assert Region Buckets are consistent and region is empty, accessor.invoke(this::assertRegionBucketsConsistency); - assertRegionIsEmpty(asList(accessor, server1, server1)); + assertRegionIsEmpty(asList(accessor, server1, server2)); } /** @@ -344,7 +351,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive( RegionShortcut regionShortcut) { final int entries = 1000; - ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY); + ExpirationAttributes expirationAttributes = + new ExpirationAttributes(SMALL_EXPIRATION_TIME, DESTROY); parametrizedSetup(regionShortcut, expirationAttributes); populateRegion(accessor, entries, asList(accessor, server1, server2)); registerVMKillerAsCacheWriter(Collections.singletonList(server1)); @@ -407,23 +415,29 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab @Parameters(method = "vmsAndRegionTypes") @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})") public void clearShouldSucceedAndRemoveRegisteredExpirationTasksWhenNonCoordinatorMemberIsBounced( - TestVM coordinatorVM, RegionShortcut regionShortcut) { - final int entries = 1500; + TestVM coordinatorVM, RegionShortcut regionShortcut) throws Exception { + final int entries = 500; + // To avoid partition offline exception without redundancy. + + if (regionShortcut == PARTITION_PERSISTENT) { + regionShortcut = PARTITION_REDUNDANT_PERSISTENT; + } else if (regionShortcut == PARTITION_PERSISTENT_OVERFLOW) { + regionShortcut = PARTITION_REDUNDANT_PERSISTENT_OVERFLOW; + } + + final RegionShortcut rs = regionShortcut; ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY); parametrizedSetup(regionShortcut, expirationAttributes); registerVMKillerAsCacheWriter(Collections.singletonList(server2)); populateRegion(accessor, entries, asList(accessor, server1, server2)); // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { - Cache cache = cacheRule.getCache(); - cache.getRegion(REGION_NAME).clear(); - }); + getVM(coordinatorVM.vmNumber).invoke(() -> doClear()); // Wait for member to get back online and assign buckets. server2.invoke(() -> { cacheRule.createCache(); - initDataStore(regionShortcut, expirationAttributes); + initDataStore(rs, expirationAttributes); await().untilAsserted( () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME)); @@ -459,8 +473,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab }); // Assert Region Buckets are consistent and region is empty, - accessor.invoke(this::assertRegionBucketsConsistency); - assertRegionIsEmpty(asList(accessor, server1, server1)); + // accessor.invoke(this::assertRegionBucketsConsistency); + assertRegionIsEmpty(asList(accessor, server1, server2)); } /** diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java index 818a855..933bc39 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java @@ -16,15 +16,24 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.CacheEvent; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAttributes; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache30.TestCacheListener; import org.apache.geode.test.junit.rules.ServerStarterRule; public class PartitionedRegionIntegrationTest { @@ -55,4 +64,40 @@ public class PartitionedRegionIntegrationTest { ScheduledExecutorService bucketSorter = region.getBucketSorter(); assertThat(bucketSorter).isNull(); } + + @Test + public void prClearWithDataInvokesCacheListenerAfterClear() { + TestCacheListener prCacheListener = new TestCacheListener() {}; + TestCacheListener spyPRCacheListener = spy(prCacheListener); + + Region region = server.createPartitionRegion("PR1", + f -> f.addCacheListener(spyPRCacheListener), f -> f.setTotalNumBuckets(2)); + region.put("key1", "value2"); + region.put("key2", "value2"); + spyPRCacheListener.enableEventHistory(); + + region.clear(); + + verify(spyPRCacheListener, times(1)).afterRegionClear(any()); + List cacheEvents = spyPRCacheListener.getEventHistory(); + assertThat(cacheEvents.size()).isEqualTo(1); + assertThat(((CacheEvent) cacheEvents.get(0)).getOperation()).isEqualTo(Operation.REGION_CLEAR); + } + + @Test + public void prClearWithoutDataInvokesCacheListenerAfterClear() { + TestCacheListener prCacheListener = new TestCacheListener() {}; + TestCacheListener spyPRCacheListener = spy(prCacheListener); + + Region region = server.createPartitionRegion("PR1", + f -> f.addCacheListener(spyPRCacheListener), f -> f.setTotalNumBuckets(2)); + spyPRCacheListener.enableEventHistory(); + + region.clear(); + + verify(spyPRCacheListener, times(1)).afterRegionClear(any()); + List cacheEvents = spyPRCacheListener.getEventHistory(); + assertThat(cacheEvents.size()).isEqualTo(1); + assertThat(((CacheEvent) cacheEvents.get(0)).getOperation()).isEqualTo(Operation.REGION_CLEAR); + } } diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index cf47e73..8a08d63 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1154,6 +1154,14 @@ org/apache/geode/internal/cache/PartitionRegionConfig,2 fromData,207 toData,178 +org/apache/geode/internal/cache/PartitionedRegionClearMessage,2 +fromData,40 +toData,36 + +org/apache/geode/internal/cache/PartitionedRegionClearMessage$PartitionedRegionClearReplyMessage,2 +fromData,29 +toData,28 + org/apache/geode/internal/cache/PoolFactoryImpl$PoolAttributes,2 fromData,161 toData,161 diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java new file mode 100644 index 0000000..1ddb301 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +/** + * Indicates a failure to perform a distributed clear operation on a Partitioned Region + * after multiple attempts. The clear may not have been successfully applied on some of + * the members hosting the region. + */ +public class PartitionedRegionPartialClearException extends CacheRuntimeException { + + public PartitionedRegionPartialClearException() {} + + public PartitionedRegionPartialClearException(String msg) { + super(msg); + } + + public PartitionedRegionPartialClearException(String msg, Throwable cause) { + super(msg, cause); + } + + public PartitionedRegionPartialClearException(Throwable cause) { + super(cause); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/Region.java b/geode-core/src/main/java/org/apache/geode/cache/Region.java index 6413113..bed7053 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/Region.java +++ b/geode-core/src/main/java/org/apache/geode/cache/Region.java @@ -1302,7 +1302,9 @@ public interface Region<K, V> extends ConcurrentMap<K, V> { * @see java.util.Map#clear() * @see CacheListener#afterRegionClear * @see CacheWriter#beforeRegionClear - * @throws UnsupportedOperationException If the region is a partitioned region + * @throws PartitionedRegionPartialClearException when data is partially cleared on partitioned + * region. It is caller responsibility to handle the partial data clear either by retrying + * the clear operation or continue working with the partially cleared partitioned region. */ @Override void clear(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index c97f391..7bd4da3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -235,6 +235,7 @@ import org.apache.geode.internal.cache.MemberFunctionStreamingMessage; import org.apache.geode.internal.cache.Node; import org.apache.geode.internal.cache.PRQueryProcessor; import org.apache.geode.internal.cache.PartitionRegionConfig; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage; import org.apache.geode.internal.cache.PreferBytesCachedDeserializable; import org.apache.geode.internal.cache.RegionEventImpl; import org.apache.geode.internal.cache.ReleaseClearLockMessage; @@ -679,6 +680,10 @@ public class DSFIDFactory implements DataSerializableFixedID { serializer.registerDSFID(PR_DUMP_B2N_REPLY_MESSAGE, DumpB2NReplyMessage.class); serializer.registerDSFID(DESTROY_PARTITIONED_REGION_MESSAGE, DestroyPartitionedRegionMessage.class); + serializer.registerDSFID(CLEAR_PARTITIONED_REGION_MESSAGE, + PartitionedRegionClearMessage.class); + serializer.registerDSFID(CLEAR_PARTITIONED_REGION_REPLY_MESSAGE, + PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage.class); serializer.registerDSFID(INVALIDATE_PARTITIONED_REGION_MESSAGE, InvalidatePartitionedRegionMessage.class); serializer.registerDSFID(COMMIT_PROCESS_QUERY_MESSAGE, CommitProcessQueryMessage.class); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index e4045c3..6cba754 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -1622,7 +1622,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { /** * Returns true if the a primary is known. */ - private boolean hasPrimary() { + protected boolean hasPrimary() { final byte primaryState = this.primaryState; return primaryState == OTHER_PRIMARY_NOT_HOSTING || primaryState == OTHER_PRIMARY_HOSTING || primaryState == IS_PRIMARY_HOSTING; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index e4fa7ef..c37e1a3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -575,8 +575,13 @@ public class BucketRegion extends DistributedRegion implements Bucket { // get rvvLock Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion(); + boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear() + .isLockedForListenerAndClientNotification(); + try { - obtainWriteLocksForClear(regionEvent, participants); + if (!isLockedAlready) { + obtainWriteLocksForClear(regionEvent, participants); + } // no need to dominate my own rvv. // Clear is on going here, there won't be GII for this member clearRegionLocally(regionEvent, cacheWrite, null); @@ -584,7 +589,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { // TODO: call reindexUserDataRegion if there're lucene indexes } finally { - releaseWriteLocksForClear(regionEvent, participants); + if (!isLockedAlready) { + releaseWriteLocksForClear(regionEvent, participants); + } } } @@ -2493,4 +2500,10 @@ public class BucketRegion extends DistributedRegion implements Bucket { void checkSameSenderIdsAvailableOnAllNodes() { // nothing needed on a bucket region } + + @Override + protected void basicClear(RegionEventImpl regionEvent) { + basicClear(regionEvent, false); + } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java index 4396581..25cc2f5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java @@ -207,7 +207,7 @@ public class DistributedClearOperation extends DistributedCacheOperation { protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm) throws EntryNotFoundException { - DistributedRegion region = (DistributedRegion) event.getRegion(); + LocalRegion region = (LocalRegion) event.getRegion(); switch (this.clearOp) { case OP_CLEAR: region.clearRegionLocally((RegionEventImpl) event, false, this.rvv); @@ -215,9 +215,11 @@ public class DistributedClearOperation extends DistributedCacheOperation { this.appliedOperation = true; break; case OP_LOCK_FOR_CLEAR: - if (region.getDataPolicy().withStorage()) { - DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(), region); - region.lockLocallyForClear(dm, this.getSender(), event); + if (region.getDataPolicy().withStorage() && region instanceof DistributedRegion) { + DistributedRegion distributedRegion = (DistributedRegion) region; + DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(), + distributedRegion); + distributedRegion.lockLocallyForClear(dm, this.getSender(), event); } this.appliedOperation = true; break; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 0eeaee2..091a24f 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2132,7 +2132,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute */ protected void releaseWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { + releaseLockLocallyForClear(regionEvent); + if (!isUsedForPartitionedRegionBucket()) { + DistributedClearOperation.releaseLocks(regionEvent, participants); + } + } + protected void releaseLockLocallyForClear(RegionEventImpl regionEvent) { ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook(); if (armLockTestHook != null) { armLockTestHook.beforeRelease(this, regionEvent); @@ -2142,9 +2148,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute if (rvv != null) { rvv.unlockForClear(getMyId()); } - if (!isUsedForPartitionedRegionBucket()) { - DistributedClearOperation.releaseLocks(regionEvent, participants); - } if (armLockTestHook != null) { armLockTestHook.afterRelease(this, regionEvent); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java index 4ae752b..7e13f06 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java @@ -463,4 +463,7 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo boolean isRegionCreateNotified(); void setRegionCreateNotified(boolean notified); + + void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 83072a3..ff42fc2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -8442,7 +8442,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * will not take distributedLock. The clear operation will also clear the local transactional * entries. The clear operation will have immediate committed state. */ - void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite, + @Override + public void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite, RegionVersionVector vector) { final boolean isRvvDebugEnabled = logger.isTraceEnabled(LogMarker.RVV_VERBOSE); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 3bc52ae..2fab2dd 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -321,6 +321,8 @@ public class PartitionedRegion extends LocalRegion } }; + private final PartitionedRegionClear partitionedRegionClear = new PartitionedRegionClear(this); + /** * Global Region for storing PR config ( PRName->PRConfig). This region would be used to resolve * PR name conflict.* @@ -2148,198 +2150,6 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - @Override - void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { - final boolean isDebugEnabled = logger.isDebugEnabled(); - synchronized (clearLock) { - final DistributedLockService lockService = getPartitionedRegionLockService(); - try { - lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1); - } catch (IllegalStateException e) { - lockCheckReadiness(); - throw e; - } - try { - if (cache.isCacheAtShutdownAll()) { - throw cache.getCacheClosedException("Cache is shutting down"); - } - - // do cacheWrite - cacheWriteBeforeRegionClear(regionEvent); - - // create ClearPRMessage per bucket - List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId()); - for (ClearPRMessage clearPRMessage : clearMsgList) { - int bucketId = clearPRMessage.getBucketId(); - checkReadiness(); - long sendMessagesStartTime = 0; - if (isDebugEnabled) { - sendMessagesStartTime = System.currentTimeMillis(); - } - try { - sendClearMsgByBucket(bucketId, clearPRMessage); - } catch (PartitionOfflineException poe) { - // TODO add a PartialResultException - logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket " - + bucketId, poe); - } catch (Exception e) { - logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e); - } - - if (isDebugEnabled) { - long now = System.currentTimeMillis(); - logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId, - (now - sendMessagesStartTime)); - } - // TODO add psStats - } - } finally { - try { - lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_')); - } catch (IllegalStateException e) { - lockCheckReadiness(); - } - } - - // notify bridge clients at PR level - regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); - boolean hasListener = hasListener(); - if (hasListener) { - dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); - } - notifyBridgeClients(regionEvent); - logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath()); - } - } - - void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) { - RetryTimeKeeper retryTime = null; - InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); - if (logger.isDebugEnabled()) { - logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId, - currentTarget); - } - - long timeOut = 0; - int count = 0; - while (true) { - switch (count) { - case 0: - // Note we don't check for DM cancellation in common case. - // First time. Assume success, keep going. - break; - case 1: - this.cache.getCancelCriterion().checkCancelInProgress(null); - // Second time (first failure). Calculate timeout and keep going. - timeOut = System.currentTimeMillis() + this.retryTimeout; - break; - default: - this.cache.getCancelCriterion().checkCancelInProgress(null); - // test for timeout - long timeLeft = timeOut - System.currentTimeMillis(); - if (timeLeft < 0) { - PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId, - this.retryTimeout); - // NOTREACHED - } - - // Didn't time out. Sleep a bit and then continue - boolean interrupted = Thread.interrupted(); - try { - Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); - } catch (InterruptedException ignore) { - interrupted = true; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - break; - } // switch - count++; - - if (currentTarget == null) { // pick target - checkReadiness(); - if (retryTime == null) { - retryTime = new RetryTimeKeeper(this.retryTimeout); - } - - currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false); - if (currentTarget == null) { - // the bucket does not exist, no need to clear - logger.info("Bucket " + bucketId + " does not contain data, no need to clear"); - return; - } else { - if (logger.isDebugEnabled()) { - logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget); - } - } - - // It's possible this is a GemFire thread e.g. ServerConnection - // which got to this point because of a distributed system shutdown or - // region closure which uses interrupt to break any sleep() or wait() calls - // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception - checkShutdown(); - continue; - } // pick target - - boolean result = false; - try { - final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId()); - if (isLocal) { - result = clearPRMessage.doLocalClear(this); - } else { - ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this); - if (response != null) { - this.prStats.incPartitionMessagesSent(); - result = response.waitForResult(); - } - } - if (result) { - return; - } - } catch (ForceReattemptException fre) { - checkReadiness(); - InternalDistributedMember lastTarget = currentTarget; - if (retryTime == null) { - retryTime = new RetryTimeKeeper(this.retryTimeout); - } - currentTarget = getNodeForBucketWrite(bucketId, retryTime); - if (lastTarget.equals(currentTarget)) { - if (logger.isDebugEnabled()) { - logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}", - currentTarget, fre.getMessage()); - } - if (retryTime.overMaximum()) { - PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket", - this.retryTimeout); - // NOTREACHED - } - retryTime.waitToRetryNode(); - } else { - if (logger.isDebugEnabled()) { - logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget, - currentTarget); - } - } - } - - // It's possible this is a GemFire thread e.g. ServerConnection - // which got to this point because of a distributed system shutdown or - // region closure which uses interrupt to break any sleep() or wait() - // calls - // e.g. waitForPrimary or waitForBucketRecovery in which case throw - // exception - checkShutdown(); - - // If we get here, the attempt failed... - if (count == 1) { - // TODO prStats add ClearPRMsg retried - this.prStats.incPutAllMsgsRetried(); - } - } - } - List<ClearPRMessage> createClearPRMessages(EventID eventID) { ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>(); for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) { @@ -10412,4 +10222,27 @@ public class PartitionedRegion extends LocalRegion this.getSystem().handleResourceEvent(ResourceEvent.REGION_CREATE, this); this.regionCreationNotified = true; } + + protected PartitionedRegionClear getPartitionedRegionClear() { + return partitionedRegionClear; + } + + @Override + void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) { + // Synchronized to avoid other threads invoking clear on this vm/node. + synchronized (clearLock) { + partitionedRegionClear.doClear(regionEvent, cacheWrite, this); + } + } + + boolean hasAnyClientsInterested() { + // Check local filter + if (getFilterProfile() != null && (getFilterProfile().hasInterest() || getFilterProfile() + .hasCQs())) { + return true; + } + // check peer server filters + return (getRegionAdvisor().hasPRServerWithInterest() + || getRegionAdvisor().hasPRServerWithCQs()); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java new file mode 100644 index 0000000..69277ef --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.OperationAbortedException; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class PartitionedRegionClear { + + private static final Logger logger = LogService.getLogger(); + + private static final String CLEAR_OPERATION = "_clearOperation"; + + private final int retryTime = 2 * 60 * 1000; + + private final PartitionedRegion partitionedRegion; + + private final LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipChange = false; + + public PartitionedRegionClear(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new PartitionedRegionClearListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock. " + ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + sendPartitionedRegionClearMessage(event, + PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + } + + void releaseLockForClear(RegionEventImpl event) { + releaseClearLockLocal(); + sendPartitionedRegionClearMessage(event, + PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + } + + List clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + List allBucketsCleared = new ArrayList(); + allBucketsCleared.addAll(clearRegionLocal(regionEvent)); + allBucketsCleared.addAll(sendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR)); + return allBucketsCleared; + } + + private void waitForPrimary() { + boolean retry; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + throw new PartitionedRegionPartialClearException( + "Unable to find primary bucket region during clear operation for region: " + + partitionedRegion.getName()); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public ArrayList clearRegionLocal(RegionEventImpl regionEvent) { + ArrayList clearedBuckets = new ArrayList(); + membershipChange = false; + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + waitForPrimary(); + + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + clearedBuckets.add(localPrimaryBucketRegion.getId()); + } + + if (membershipChange) { + membershipChange = false; + retry = true; + } else { + retry = false; + } + + } while (retry); + doAfterClear(regionEvent); + } finally { + partitionedRegion.getDataStore().unlockBucketCreationForRegionClear(); + } + } else { + // Non data-store with client queue and listener + doAfterClear(regionEvent); + } + } + return clearedBuckets; + } + + private void doAfterClear(RegionEventImpl regionEvent) { + if (partitionedRegion.hasAnyClientsInterested()) { + notifyClients(regionEvent); + } + + if (partitionedRegion.hasListener()) { + partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + } + + void notifyClients(RegionEventImpl event) { + // Set client routing information into the event + // The clear operation in case of PR is distributed differently + // hence the FilterRoutingInfo is set here instead of + // DistributedCacheOperation.distribute(). + event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); + if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion + .isUsedForPartitionedRegionAdmin() + && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion + .isUsedForParallelGatewaySenderQueue()) { + + FilterRoutingInfo localCqFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event, + FilterProfile.NO_PROFILES, Collections.emptySet()); + + FilterRoutingInfo localCqInterestFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event); + + if (localCqInterestFrInfo != null) { + event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo()); + } + } + partitionedRegion.notifyBridgeClients(event); + } + + protected void obtainClearLockLocal(InternalDistributedMember requester) { + synchronized (lockForListenerAndClientNotification) { + // Check if the member is still part of the distributed system + if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) { + return; + } + + lockForListenerAndClientNotification.setLocked(requester); + if (partitionedRegion.getDataStore() != null) { + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } catch (Exception ex) { + partitionedRegion.checkClosed(); + } + } + } + } + } + + protected void releaseClearLockLocal() { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() == null) { + // The member has left. + return; + } + try { + if (partitionedRegion.getDataStore() != null) { + + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.releaseLockLocallyForClear(null); + } catch (Exception ex) { + logger.debug( + "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion + .getName(), + ex.getMessage()); + partitionedRegion.checkClosed(); + } + } + } + } finally { + lockForListenerAndClientNotification.setUnLocked(); + } + } + } + + private List sendPartitionedRegionClearMessage(RegionEventImpl event, + PartitionedRegionClearMessage.OperationType op) { + RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); + eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); + + do { + try { + return attemptToSendPartitionedRegionClearMessage(event, op); + } catch (ForceReattemptException reattemptException) { + // retry + } + } while (true); + } + + private List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event, + PartitionedRegionClearMessage.OperationType op) + throws ForceReattemptException { + List bucketsOperated = null; + + if (partitionedRegion.getPRRoot() == null) { + if (logger.isDebugEnabled()) { + logger.debug( + "Partition region {} failed to initialize. Remove its profile from remote members.", + this.partitionedRegion); + } + new UpdateAttributesProcessor(partitionedRegion, true).distribute(false); + return bucketsOperated; + } + + final HashSet configRecipients = + new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes()); + + try { + final PartitionRegionConfig prConfig = + partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier()); + + if (prConfig != null) { + Iterator itr = prConfig.getNodes().iterator(); + while (itr.hasNext()) { + InternalDistributedMember idm = ((Node) itr.next()).getMemberId(); + if (!idm.equals(partitionedRegion.getMyId())) { + configRecipients.add(idm); + } + } + } + } catch (CancelException ignore) { + // ignore + } + + try { + PartitionedRegionClearMessage.PartitionedRegionClearResponse resp = + new PartitionedRegionClearMessage.PartitionedRegionClearResponse( + partitionedRegion.getSystem(), + configRecipients); + PartitionedRegionClearMessage partitionedRegionClearMessage = + new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event); + partitionedRegionClearMessage.send(); + + resp.waitForRepliesUninterruptibly(); + bucketsOperated = resp.bucketsCleared; + + } catch (ReplyException e) { + Throwable t = e.getCause(); + if (t instanceof ForceReattemptException) { + throw (ForceReattemptException) t; + } + if (t instanceof PartitionedRegionPartialClearException) { + throw new PartitionedRegionPartialClearException(t.getMessage(), t); + } + logger.warn( + "PartitionedRegionClear#sendPartitionedRegionClearMessage: Caught exception during ClearRegionMessage send and waiting for response", + e); + } + return bucketsOperated; + } + + void doClear(RegionEventImpl regionEvent, boolean cacheWrite, + PartitionedRegion partitionedRegion) { + String lockName = CLEAR_OPERATION + partitionedRegion.getDisplayName(); + + try { + // distributed lock to make sure only one clear op is in progress in the cluster. + acquireDistributedClearLock(lockName); + + // Force all primary buckets to be created before clear. + PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion); + + // do cacheWrite + try { + partitionedRegion.cacheWriteBeforeRegionClear(regionEvent); + } catch (OperationAbortedException operationAbortedException) { + throw new CacheWriterException(operationAbortedException); + } + + // Check if there are any listeners or clients interested. If so, then clear write + // locks needs to be taken on all local and remote primary buckets in order to + // preserve the ordering of client events (for concurrent operations on the region). + boolean acquireClearLockForClientNotification = + (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener()); + if (acquireClearLockForClientNotification) { + obtainLockForClear(regionEvent); + } + try { + List bucketsCleared = clearRegion(regionEvent, cacheWrite, null); + + if (partitionedRegion.getTotalNumberOfBuckets() != bucketsCleared.size()) { + String message = "Unable to clear all the buckets from the partitioned region " + + partitionedRegion.getName() + + ", either data (buckets) moved or member departed."; + + logger.warn(message + " expected to clear number of buckets: " + + partitionedRegion.getTotalNumberOfBuckets() + + " actual cleared: " + bucketsCleared.size()); + + throw new PartitionedRegionPartialClearException(message); + } + } finally { + if (acquireClearLockForClientNotification) { + releaseLockForClear(regionEvent); + } + } + + } finally { + releaseDistributedClearLock(lockName); + } + } + + void handleClearFromDepartedMember(InternalDistributedMember departedMember) { + if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() != null) { + releaseClearLockLocal(); + } + } + } + } + + class LockForListenerAndClientNotification { + + private boolean locked = false; + + private InternalDistributedMember lockRequester; + + synchronized void setLocked(InternalDistributedMember member) { + locked = true; + lockRequester = member; + } + + synchronized void setUnLocked() { + locked = false; + lockRequester = null; + } + + synchronized boolean isLocked() { + return locked; + } + + synchronized InternalDistributedMember getLockRequester() { + return lockRequester; + } + } + + protected class PartitionedRegionClearListener implements MembershipListener { + + @Override + public synchronized void memberDeparted(DistributionManager distributionManager, + InternalDistributedMember id, boolean crashed) { + membershipChange = true; + handleClearFromDepartedMember(id); + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java new file mode 100755 index 0000000..b66ab44 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.ReplyMessage; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.ReplySender; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.NanoTimer; +import org.apache.geode.internal.cache.partitioned.PartitionMessage; +import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class PartitionedRegionClearMessage extends PartitionMessage { + + public enum OperationType { + OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR, + } + + private Object cbArg; + + private OperationType op; + + private EventID eventID; + + private PartitionedRegion partitionedRegion; + + private ArrayList bucketsCleared; + + @Override + public EventID getEventID() { + return eventID; + } + + public PartitionedRegionClearMessage() {} + + PartitionedRegionClearMessage(Set recipients, PartitionedRegion region, + ReplyProcessor21 processor, PartitionedRegionClearMessage.OperationType operationType, + final RegionEventImpl event) { + super(recipients, region.getPRId(), processor); + partitionedRegion = region; + op = operationType; + cbArg = event.getRawCallbackArgument(); + eventID = event.getEventId(); + } + + public OperationType getOp() { + return op; + } + + public void send() { + Assert.assertTrue(getRecipients() != null, "ClearMessage NULL recipients set"); + setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed()); + partitionedRegion.getDistributionManager().putOutgoing(this); + } + + @Override + protected Throwable processCheckForPR(PartitionedRegion pr, + DistributionManager distributionManager) { + if (pr != null && !pr.getDistributionAdvisor().isInitialized()) { + Throwable thr = new ForceReattemptException( + String.format("%s : could not find partitioned region with Id %s", + distributionManager.getDistributionManagerId(), + pr.getRegionIdentifier())); + return thr; + } + return null; + } + + @Override + protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, + PartitionedRegion partitionedRegion, + long startTime) throws CacheException { + + if (partitionedRegion == null) { + return true; + } + + if (partitionedRegion.isDestroyed()) { + return true; + } + + if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) { + partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender()); + } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) { + partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal(); + } else { + RegionEventImpl event = + new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, this.cbArg, true, + partitionedRegion.getMyId(), + getEventID()); + bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event); + } + return true; + } + + @Override + protected void appendFields(StringBuilder buff) { + super.appendFields(buff); + buff.append(" cbArg=").append(this.cbArg).append(" op=").append(this.op); + } + + @Override + public int getDSFID() { + return CLEAR_PARTITIONED_REGION_MESSAGE; + } + + @Override + public void fromData(DataInput in, + DeserializationContext context) throws IOException, ClassNotFoundException { + super.fromData(in, context); + this.cbArg = DataSerializer.readObject(in); + op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; + eventID = DataSerializer.readObject(in); + } + + @Override + public void toData(DataOutput out, + SerializationContext context) throws IOException { + super.toData(out, context); + DataSerializer.writeObject(this.cbArg, out); + out.writeByte(op.ordinal()); + DataSerializer.writeObject(eventID, out); + } + + /** + * The response on which to wait for all the replies. This response ignores any exceptions + * received from the "far side" + */ + public static class PartitionedRegionClearResponse extends ReplyProcessor21 { + CopyOnWriteArrayList bucketsCleared = new CopyOnWriteArrayList(); + + public PartitionedRegionClearResponse(InternalDistributedSystem system, Set initMembers) { + super(system, initMembers); + } + + @Override + public void process(DistributionMessage msg) { + if (msg instanceof PartitionedRegionClearReplyMessage) { + List buckets = ((PartitionedRegionClearReplyMessage) msg).bucketsCleared; + if (buckets != null) { + bucketsCleared.addAll(buckets); + } + } + super.process(msg, true); + } + } + + @Override + protected void sendReply(InternalDistributedMember member, int processorId, + DistributionManager distributionManager, ReplyException ex, + PartitionedRegion partitionedRegion, long startTime) { + if (partitionedRegion != null) { + if (startTime > 0) { + partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime); + } + } + PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage + .send(member, processorId, getReplySender(distributionManager), op, bucketsCleared, + ex); + } + + public static class PartitionedRegionClearReplyMessage extends ReplyMessage { + + private ArrayList bucketsCleared; + + private OperationType op; + + @Override + public boolean getInlineProcess() { + return true; + } + + /** + * Empty constructor to conform to DataSerializable interface + */ + public PartitionedRegionClearReplyMessage() {} + + private PartitionedRegionClearReplyMessage(int processorId, OperationType op, + ArrayList bucketsCleared, ReplyException ex) { + super(); + this.bucketsCleared = bucketsCleared; + this.op = op; + setProcessorId(processorId); + setException(ex); + } + + /** Send an ack */ + public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm, + OperationType op, ArrayList bucketsCleared, ReplyException ex) { + + Assert.assertTrue(recipient != null, "partitionedRegionClearReplyMessage NULL reply message"); + + PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage m = + new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, op, + bucketsCleared, ex); + + m.setRecipient(recipient); + dm.putOutgoing(m); + } + + /** + * Processes this message. This method is invoked by the receiver of the message. + * + * @param dm the distribution manager that is processing the message. + */ + @Override + public void process(final DistributionManager dm, final ReplyProcessor21 rp) { + final long startTime = getTimestamp(); + + if (rp == null) { + if (LogService.getLogger().isTraceEnabled(LogMarker.DM_VERBOSE)) { + LogService.getLogger().trace(LogMarker.DM_VERBOSE, "{}: processor not found", this); + } + return; + } + + rp.process(this); + + dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime); + } + + @Override + public int getDSFID() { + return CLEAR_PARTITIONED_REGION_REPLY_MESSAGE; + } + + @Override + public void fromData(DataInput in, + DeserializationContext context) throws IOException, ClassNotFoundException { + super.fromData(in, context); + op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; + bucketsCleared = DataSerializer.readArrayList(in); + } + + @Override + public void toData(DataOutput out, + SerializationContext context) throws IOException { + super.toData(out, context); + out.writeByte(op.ordinal()); + DataSerializer.writeArrayList(bucketsCleared, out); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("PartitionedRegionClearReplyMessage ") + .append("processorId=").append(this.processorId) + .append(" sender=").append(sender) + .append(" bucketsCleared ").append(this.bucketsCleared) + .append(" exception=").append(getException()); + return sb.toString(); + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index 23a7487..578ed3e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -980,6 +980,14 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { } } + protected void lockBucketCreationForRegionClear() { + bucketCreationLock.writeLock().lock(); + } + + protected void unlockBucketCreationForRegionClear() { + bucketCreationLock.writeLock().unlock(); + } + /** * Gets the total amount of memory in bytes allocated for all values for this PR in this VM. This * is the current memory (MB) watermark for data in this PR. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java index 5d2ff24..13ad666 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java @@ -851,10 +851,21 @@ public class RegionAdvisor extends CacheDistributionAdvisor { && prof.filterProfile.hasInterest(); }; + @Immutable + private static final Filter prServerWithCqFilter = profile -> { + CacheProfile prof = (CacheProfile) profile; + return prof.isPartitioned && prof.hasCacheServer && prof.filterProfile != null + && prof.filterProfile.hasCQs(); + }; + public boolean hasPRServerWithInterest() { return satisfiesFilter(prServerWithInterestFilter); } + public boolean hasPRServerWithCQs() { + return satisfiesFilter(prServerWithCqFilter); + } + /** * return the set of all members who must receive operation notifications * diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index eeee864..d69c6b5 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -79,6 +79,7 @@ org/apache/geode/cache/NoSubscriptionServersAvailableException,true,848408601915 org/apache/geode/cache/Operation,true,-7521751729852504238,ordinal:byte org/apache/geode/cache/OperationAbortedException,true,-8293166225026556949 org/apache/geode/cache/PartitionedRegionDistributionException,true,-3004093739855972548 +org/apache/geode/cache/PartitionedRegionPartialClearException,false org/apache/geode/cache/PartitionedRegionStorageException,true,5905463619475329732 org/apache/geode/cache/RegionAccessException,true,3142958723089038406 org/apache/geode/cache/RegionDestroyedException,true,319804842308010754,regionFullPath:java/lang/String @@ -303,6 +304,7 @@ org/apache/geode/internal/cache/PRContainsValueFunction,false org/apache/geode/internal/cache/PRHARedundancyProvider$ArrayListWithClearState,true,1,wasCleared:boolean org/apache/geode/internal/cache/PartitionedRegion$PRIdMap,true,3667357372967498179,cleared:boolean org/apache/geode/internal/cache/PartitionedRegion$SizeEntry,false,isPrimary:boolean,size:int +org/apache/geode/internal/cache/PartitionedRegionClearMessage$OperationType,false org/apache/geode/internal/cache/PartitionedRegionDataStore$CreateBucketResult,false,nowExists:boolean org/apache/geode/internal/cache/PartitionedRegionException,true,5113786059279106007 org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator$MemberResultsList,false,isLastChunkReceived:boolean diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java index c7cf5a6..d3397eb 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java @@ -51,7 +51,9 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { when(ba.getPrimaryMoveReadLock()).thenReturn(primaryMoveReadLock); when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class)); when(ba.isPrimary()).thenReturn(true); - + PartitionedRegionClear clearPR = mock(PartitionedRegionClear.class); + when(clearPR.isLockedForListenerAndClientNotification()).thenReturn(true); + when(pr.getPartitionedRegionClear()).thenReturn(clearPR); ira.setPartitionedRegion(pr).setPartitionedRegionBucketRedundancy(1).setBucketAdvisor(ba); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java index 898c4f7..e02ba2c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java @@ -58,7 +58,6 @@ import org.mockito.junit.MockitoRule; import org.apache.geode.CancelCriterion; import org.apache.geode.Statistics; import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheLoader; import org.apache.geode.cache.CacheWriter; import org.apache.geode.cache.Operation; @@ -221,22 +220,6 @@ public class PartitionedRegionTest { spyPartitionedRegion.clear(); } - @Test(expected = CacheClosedException.class) - public void clearShouldThrowCacheClosedExceptionIfShutdownAll() { - PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); - RegionEventImpl regionEvent = - new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false, - spyPartitionedRegion.getMyId(), true); - when(cache.isCacheAtShutdownAll()).thenReturn(true); - when(cache.getCacheClosedException("Cache is shutting down")) - .thenReturn(new CacheClosedException("Cache is shutting down")); - DistributedLockService lockService = mock(DistributedLockService.class); - when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService); - String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_'); - when(lockService.lock(lockName, -1, -1)).thenReturn(true); - spyPartitionedRegion.basicClear(regionEvent, true); - } - @Test public void createClearPRMessagesShouldCreateMessagePerBucket() { PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); @@ -249,28 +232,6 @@ public class PartitionedRegionTest { assertThat(msgs.size()).isEqualTo(3); } - @Test - public void sendEachMessagePerBucket() { - PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); - RegionEventImpl regionEvent = - new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false, - spyPartitionedRegion.getMyId(), true); - when(cache.isCacheAtShutdownAll()).thenReturn(false); - DistributedLockService lockService = mock(DistributedLockService.class); - when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService); - when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3); - String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_'); - when(lockService.lock(lockName, -1, -1)).thenReturn(true); - when(spyPartitionedRegion.hasListener()).thenReturn(true); - doNothing().when(spyPartitionedRegion).dispatchListenerEvent(any(), any()); - doNothing().when(spyPartitionedRegion).notifyBridgeClients(eq(regionEvent)); - doNothing().when(spyPartitionedRegion).checkReadiness(); - doNothing().when(lockService).unlock(lockName); - spyPartitionedRegion.basicClear(regionEvent, true); - verify(spyPartitionedRegion, times(3)).sendClearMsgByBucket(any(), any()); - verify(spyPartitionedRegion, times(1)).dispatchListenerEvent(any(), any()); - verify(spyPartitionedRegion, times(1)).notifyBridgeClients(eq(regionEvent)); - } @Test public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() { diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java index 76efebe..2607e19 100644 --- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java +++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java @@ -56,6 +56,8 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer // NOTE, codes < -65536 will take 4 bytes to serialize // NOTE, codes < -128 will take 2 bytes to serialize + short CLEAR_PARTITIONED_REGION_REPLY_MESSAGE = -166; + short CLEAR_PARTITIONED_REGION_MESSAGE = -165; short PR_CLEAR_REPLY_MESSAGE = -164; short PR_CLEAR_MESSAGE = -163;