This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4d5f84c27a GEODE-10310: Add disable reatempt on CacheClose (#7690)
4d5f84c27a is described below
commit 4d5f84c27aae4b268731e7afce6aa078f27c4e94
Author: Mario Ivanac <[email protected]>
AuthorDate: Thu May 19 22:22:07 2022 +0200
GEODE-10310: Add disable reatempt on CacheClose (#7690)
* GEODE-10310: Add disable reatempt on CacheClose
---
...onedRegionCacheCloseNoRetryDistributedTest.java | 288 +++++++++++++++++++++
.../cache/partitioned/PartitionMessage.java | 14 +
2 files changed, 302 insertions(+)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java
new file mode 100644
index 0000000000..e5e7f3ae4c
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
+import static
org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+import org.apache.geode.util.internal.GeodeGlossary;
+
+public class PartitionedRegionCacheCloseNoRetryDistributedTest implements
Serializable {
+
+ private String partitionedRegionName;
+
+ private VM vm0;
+ private VM vm1;
+ private VM vm2;
+ private VM vm3;
+
+ private static final long TIMEOUT_MILLIS =
GeodeAwaitility.getTimeout().toMillis();
+
+ @Rule
+ public CacheRule cacheRule =
+ CacheRule.builder().addConfig(getDistributedSystemProperties()).build();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+
+ @Before
+ public void setUp() {
+ vm0 = getVM(0);
+ vm1 = getVM(1);
+ vm2 = getVM(2);
+ vm3 = getVM(3);
+
+ invokeInEveryVM(() -> {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX +
"PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE",
+ "true");
+ });
+ String uniqueName = getClass().getSimpleName() + "-" +
testName.getMethodName();
+ partitionedRegionName = uniqueName + "-partitionedRegion";
+ }
+
+ @After
+ public void tearDown() {
+ invokeInEveryVM(() -> {
+ System.clearProperty(
+ GeodeGlossary.GEMFIRE_PREFIX +
"PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE");
+ InternalResourceManager.setResourceObserver(null);
+ DistributionMessageObserver.setInstance(null);
+ });
+ }
+
+ private Properties getDistributedSystemProperties() {
+ Properties config = new Properties();
+ config.setProperty(SERIALIZABLE_OBJECT_FILTER,
TestFunction.class.getName());
+ return config;
+ }
+
+
+ @Test
+ public void testCacheCloseDuringWrite()
+ throws InterruptedException {
+ int redundantCopies = 1;
+ int recoveryDelay = -1;
+ int numBuckets = 100;
+ boolean diskSynchronous = true;
+
+ vm0.invoke(() -> {
+ createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets,
diskSynchronous);
+ createData(0, numBuckets, "a");
+ });
+
+ vm1.invoke(() -> {
+ createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets,
diskSynchronous);
+ });
+
+ // Need to invoke this async because vm1 will wait for vm0 to come back
online
+ // unless we explicitly revoke it.
+
+ int endData = 10000;
+
+ AsyncInvocation createRegionDataAsync = vm0.invokeAsync(
+ () -> {
+ Exception exc = null;
+ try {
+ createData(numBuckets, endData, "b");
+ } catch (Exception e) {
+ exc = e;
+ }
+
+ assertThat(exc).isNotNull();
+ assertThat(exc).isInstanceOf(InternalGemFireException.class);
+
+ });
+
+ AsyncInvocation closeCacheAsync = vm1.invokeAsync(
+ () -> {
+ getCache().close();
+ });
+
+ closeCacheAsync.get();
+ createRegionDataAsync.get();
+
+ }
+
+
+ @Test
+ public void testCacheCloseDuringInvalidate()
+ throws InterruptedException {
+ int redundantCopies = 1;
+ int recoveryDelay = -1;
+ int numBuckets = 100;
+ boolean diskSynchronous = true;
+ int endData = 10000;
+
+ vm0.invoke(() -> {
+ createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets,
diskSynchronous);
+ createData(0, endData, "a");
+ });
+
+ vm1.invoke(() -> {
+ createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets,
diskSynchronous);
+ });
+
+ // Need to invoke this async because vm1 will wait for vm0 to come back
online
+ // unless we explicitly revoke it.
+
+ AsyncInvocation invalidateRegionDataAsync = vm0.invokeAsync(
+ () -> {
+ Exception exc = null;
+ try {
+ invalidateData(0, endData);
+ } catch (Exception e) {
+ exc = e;
+ }
+
+ assertThat(exc).isNotNull();
+ assertThat(exc).isInstanceOf(InternalGemFireException.class);
+
+ });
+
+ AsyncInvocation closeCacheAsync = vm1.invokeAsync(
+ () -> {
+ getCache().close();
+ });
+
+ closeCacheAsync.get();
+ invalidateRegionDataAsync.get();
+
+ }
+
+
+ private void createPartitionedRegion(final int redundancy, final int
recoveryDelay,
+ final int numBuckets, final boolean synchronous) throws
InterruptedException {
+ CountDownLatch recoveryDone = new CountDownLatch(1);
+
+ if (redundancy > 0) {
+ InternalResourceManager.ResourceObserver observer =
+ new InternalResourceManager.ResourceObserverAdapter() {
+ @Override
+ public void recoveryFinished(Region region) {
+ recoveryDone.countDown();
+ }
+ };
+
+ InternalResourceManager.setResourceObserver(observer);
+ } else {
+ recoveryDone.countDown();
+ }
+
+ PartitionAttributesFactory<?, ?> partitionAttributesFactory = new
PartitionAttributesFactory();
+ partitionAttributesFactory.setRedundantCopies(redundancy);
+ partitionAttributesFactory.setRecoveryDelay(recoveryDelay);
+ partitionAttributesFactory.setTotalNumBuckets(numBuckets);
+ partitionAttributesFactory.setLocalMaxMemory(500);
+
+ RegionFactory<?, ?> regionFactory =
+ getCache().createRegionFactory(PARTITION_PERSISTENT);
+ regionFactory.setDiskSynchronous(synchronous);
+ regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+ regionFactory.create(partitionedRegionName);
+
+ recoveryDone.await(TIMEOUT_MILLIS, MILLISECONDS);
+ }
+
+
+ private InternalCache getCache() {
+ return cacheRule.getOrCreateCache();
+ }
+
+ private void createData(final int startKey, final int endKey, final String
value) {
+ createDataFor(startKey, endKey, value, partitionedRegionName);
+ }
+
+ private void createDataFor(final int startKey, final int endKey, final
String value,
+ final String regionName) {
+ Region<Integer, String> region = getCache().getRegion(regionName);
+ for (int i = startKey; i < endKey; i++) {
+ region.put(i, value);
+ }
+ }
+
+ private void invalidateData(final int startKey, final int endKey) {
+ invalidateDataFor(startKey, endKey, partitionedRegionName);
+ }
+
+ private void invalidateDataFor(final int startKey, final int endKey,
+ final String regionName) {
+ Region<?, ?> region = getCache().getRegion(regionName);
+ for (int i = startKey; i < endKey; i++) {
+ region.invalidate(i);
+ }
+ }
+
+
+ private static class TestFunction implements Function, Serializable {
+
+ @Override
+ public void execute(final FunctionContext context) {
+ context.getResultSender().lastResult(null);
+ }
+
+ @Override
+ public String getId() {
+ return TestFunction.class.getSimpleName();
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+ }
+
+
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 49e5f51e0a..8ebc4da469 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheClosedException;
@@ -67,6 +68,7 @@ import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.StaticSerialization;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.util.internal.GeodeGlossary;
/**
* The base PartitionedRegion message type upon which other messages should be
based.
@@ -84,6 +86,10 @@ public abstract class PartitionMessage extends
DistributionMessage
"Unknown exception")
.fillInStackTrace();
+
+ private static final boolean disableReatemptOnCacheClose =
+ Boolean.getBoolean(
+ GeodeGlossary.GEMFIRE_PREFIX +
"PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE");
int regionId;
int processorId;
@@ -838,6 +844,14 @@ public abstract class PartitionMessage extends
DistributionMessage
throw new PrimaryBucketException(
"Peer failed primary test", t);
} else if (t instanceof CancelException) {
+ if (disableReatemptOnCacheClose) {
+ logger.debug(
+ "PartitionResponse got CacheClosedException from {}, throwing
InternalGemFireException",
+ e.getSender(), t);
+ throw new InternalGemFireException(
+ "No retry after CacheClosedException from " + e.getSender());
+ }
+
logger.debug(
"PartitionResponse got CacheClosedException from {}, throwing
ForceReattemptException",
e.getSender(), t);