This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit d93f3d554ed4d0746fedc90b220dd8a701c08f47 Author: Kirk Lund <[email protected]> AuthorDate: Thu Mar 22 19:48:11 2018 -0700 GEODE-1279: Rename Bug41733DUnitTest as BucketCreationRequesterCrashHARegressionTest --- ...cketCreationRequesterCrashHARegressionTest.java | 261 +++++++++++++++++++++ .../geode/internal/cache/Bug41733DUnitTest.java | 211 ----------------- 2 files changed, 261 insertions(+), 211 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationRequesterCrashHARegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationRequesterCrashHARegressionTest.java new file mode 100644 index 0000000..c0e124f --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationRequesterCrashHARegressionTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION; +import static org.apache.geode.test.dunit.DistributedTestUtils.crashDistributedSystem; +import static org.apache.geode.test.dunit.Host.getHost; +import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; +import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.ForcedDisconnectException; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.partitioned.ManageBucketMessage; +import org.apache.geode.internal.cache.partitioned.ManageBucketMessage.ManageBucketReplyMessage; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.RMIException; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.CacheTestCase; +import org.apache.geode.test.dunit.rules.SharedErrorCollector; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +/** + * Test to make sure that we can handle a crash of the member directing bucket creation. + * + * BucketCreationRequesterCrashHARegressionTest + * + * <p> + * TRAC #41733: Hang in BucketAdvisor.waitForPrimaryMember + */ +@Category(DistributedTest.class) +public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase { + + private String uniqueName; + private String hostName; + private int locatorPort; + private File locatorLog; + + private VM server1; + private VM server2; + private VM locator; + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Rule + public SharedErrorCollector errorCollector = new SharedErrorCollector(); + + @Before + public void setUp() throws Exception { + server1 = getHost(0).getVM(0); + server2 = getHost(0).getVM(1); + locator = getHost(0).getVM(2); + + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); + hostName = NetworkUtils.getServerHostName(server1.getHost()); + locatorLog = new File(temporaryFolder.newFolder(uniqueName), "locator.log"); + + locatorPort = locator.invoke(() -> startLocator()); + assertThat(locatorPort).isGreaterThan(0); + + server1.invoke(() -> createServerCache()); + server2.invoke(() -> createServerCache()); + + // cluster should ONLY have 3 members (our 2 servers and 1 locator) + assertThat(server1.invoke( + () -> getCache().getDistributionManager().getDistributionManagerIdsIncludingAdmin())) + .hasSize(3); + + addIgnoredException(ForcedDisconnectException.class); + } + + @After + public void tearDown() throws Exception { + DistributionMessageObserver.setInstance(null); + invokeInEveryVM(() -> { + DistributionMessageObserver.setInstance(null); + }); + + disconnectAllFromDS(); + } + + /** + * Test the we can handle a member departing after creating a bucket on the remote node but before + * we choose a primary + */ + @Test + public void putShouldNotHangAfterBucketCrashesBeforePrimarySelection() throws Exception { + server1.invoke( + () -> handleBeforeProcessMessage(ManageBucketReplyMessage.class, () -> crashServer())); + server1.invoke(() -> createPartitionedRegion()); + + // Create a couple of buckets in VM0. This will make sure + // the next bucket we create will be created in VM 1. + server1.invoke(() -> putData(0, 2, "a")); + + server2.invoke(() -> createPartitionedRegion()); + + // Trigger a bucket creation in VM1, which should cause server1 to close it's cache. + assertThatThrownBy(() -> server1.invoke(() -> putData(3, 4, "a"))) + .isInstanceOf(RMIException.class) + .hasCauseInstanceOf(DistributedSystemDisconnectedException.class); + + assertThat(server2.invoke(() -> getBucketList())).containsExactly(3); + + // This shouldn't hang, because the bucket creation should finish,. + server2.invoke(() -> putData(3, 4, "a")); + } + + /** + * Test the we can handle a member departing while we are in the process of creating the bucket on + * the remote node. + */ + @Test + public void putShouldNotHangAfterServerWithBucketCrashes() throws Exception { + server2.invoke(() -> handleBeforeProcessMessage(ManageBucketMessage.class, + () -> server1.invoke(() -> crashServer()))); + server1.invoke(() -> createPartitionedRegion()); + + // Create a couple of buckets in VM0. This will make sure + // the next bucket we create will be created in VM 1. + server1.invoke(() -> putData(0, 2, "a")); + + server2.invoke(() -> createPartitionedRegion()); + + // Trigger a bucket creation in VM1, which should cause server1 to close it's cache. + assertThatThrownBy(() -> server1.invoke(() -> putData(3, 4, "a"))) + .isInstanceOf(RMIException.class) + .hasCauseInstanceOf(DistributedSystemDisconnectedException.class); + + assertThat(server2.invoke(() -> getBucketList())).containsExactly(3); + + // This shouldn't hang, because the bucket creation should finish. + server2.invoke(() -> putData(3, 4, "a")); + } + + private Properties createLocatorConfig() { + Properties config = new Properties(); + config.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + config.setProperty(ENABLE_NETWORK_PARTITION_DETECTION, "false"); + config.setProperty(USE_CLUSTER_CONFIGURATION, "false"); + return config; + } + + private Properties createServerConfig() { + Properties config = createLocatorConfig(); + config.setProperty(LOCATORS, hostName + "[" + locatorPort + "]"); + return config; + } + + private int startLocator() throws IOException { + Properties config = createLocatorConfig(); + InetAddress bindAddress = InetAddress.getByName(hostName); + Locator locator = Locator.startLocatorAndDS(locatorPort, locatorLog, bindAddress, config); + return locator.getPort(); + } + + private void createServerCache() { + getCache(createServerConfig()); + } + + private void createPartitionedRegion() { + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + + AttributesFactory af = new AttributesFactory(); + af.setDataPolicy(DataPolicy.PARTITION); + af.setPartitionAttributes(paf.create()); + + getCache().createRegion(uniqueName, af.create()); + } + + private void putData(final int startKey, final int endKey, final String value) { + Region<Integer, String> region = getCache().getRegion(uniqueName); + + for (int i = startKey; i < endKey; i++) { + region.put(i, value); + } + } + + private Set<Integer> getBucketList() { + PartitionedRegion region = (PartitionedRegion) getCache().getRegion(uniqueName); + return new TreeSet<>(region.getDataStore().getAllLocalBucketIds()); + } + + private void handleBeforeProcessMessage(final Class<? extends DistributionMessage> messageClass, + final SerializableRunnableIF runnable) { + DistributionMessageObserver + .setInstance(new RunnableBeforeProcessMessageObserver(messageClass, runnable)); + } + + private void crashServer() { + crashDistributedSystem(getSystem()); + } + + private class RunnableBeforeProcessMessageObserver extends DistributionMessageObserver { + + private final Class<? extends DistributionMessage> messageClass; + private final SerializableRunnableIF runnable; + + RunnableBeforeProcessMessageObserver(final Class<? extends DistributionMessage> messageClass, + final SerializableRunnableIF runnable) { + this.messageClass = messageClass; + this.runnable = runnable; + } + + @Override + public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + if (messageClass.isInstance(message)) { + try { + runnable.run(); + } catch (Exception e) { + errorCollector.addError(e); + } + } + } + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/Bug41733DUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/Bug41733DUnitTest.java deleted file mode 100644 index ae2e664..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/Bug41733DUnitTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import static org.junit.Assert.*; - -import java.util.*; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.PartitionAttributesFactory; -import org.apache.geode.cache.Region; -import org.apache.geode.distributed.*; -import org.apache.geode.distributed.internal.ClusterDistributionManager; -import org.apache.geode.distributed.internal.DistributionMessage; -import org.apache.geode.distributed.internal.DistributionMessageObserver; -import org.apache.geode.internal.cache.partitioned.ManageBucketMessage; -import org.apache.geode.internal.cache.partitioned.ManageBucketMessage.ManageBucketReplyMessage; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.RMIException; -import org.apache.geode.test.dunit.SerializableCallable; -import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; - -/** - * Test to make sure that we can handle a crash of the member directing bucket creation. - */ -@Category(DistributedTest.class) -public class Bug41733DUnitTest extends JUnit4CacheTestCase { - - @Override - public final void preTearDownCacheTestCase() throws Exception { - disconnectAllFromDS(); - } - - @Override - public Properties getDistributedSystemProperties() { - Properties result = super.getDistributedSystemProperties(); - result.put(ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION, "false"); - return result; - } - - /** - * Test the we can handle a member departing after creating a bucket on the remote node but before - * we choose a primary - */ - @Test - public void testCrashAfterBucketCreation() throws Throwable { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - vm0.invoke(new SerializableRunnable("Install observer") { - - public void run() { - DistributionMessageObserver.setInstance(new DistributionMessageObserver() { - - - @Override - public void beforeProcessMessage(ClusterDistributionManager dm, - DistributionMessage message) { - if (message instanceof ManageBucketReplyMessage) { - disconnectFromDS(); - } - } - }); - - } - }); - createPR(vm0, 0); - - // Create a couple of buckets in VM0. This will make sure - // the next bucket we create will be created in VM 1. - putData(vm0, 0, 2, "a"); - - createPR(vm1, 0); - - // Trigger a bucket creation in VM1, which should cause vm0 to close it's cache. - try { - putData(vm0, 3, 4, "a"); - fail("should have received a cache closed exception"); - } catch (RMIException e) { - if (!(e.getCause() instanceof DistributedSystemDisconnectedException)) { - throw e; - } - } - - assertEquals(Collections.singleton(3), getBucketList(vm1)); - - // This shouldn't hang, because the bucket creation should finish,. - putData(vm1, 3, 4, "a"); - } - - /** - * Test the we can handle a member departing while we are in the process of creating the bucket on - * the remote node. - */ - @Test - public void testCrashDuringBucketCreation() throws Throwable { - Host host = Host.getHost(0); - final VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - vm1.invoke(new SerializableRunnable("Install observer") { - - public void run() { - - DistributionMessageObserver.setInstance(new DistributionMessageObserver() { - - @Override - public void beforeProcessMessage(ClusterDistributionManager dm, - DistributionMessage message) { - if (message instanceof ManageBucketMessage) { - vm0.invoke(() -> disconnectFromDS()); - } - } - }); - - } - }); - createPR(vm0, 0); - - // Create a couple of buckets in VM0. This will make sure - // the next bucket we create will be created in VM 1. - putData(vm0, 0, 2, "a"); - - createPR(vm1, 0); - - // Trigger a bucket creation in VM1, which should cause vm0 to close it's cache. - try { - putData(vm0, 3, 4, "a"); - fail("should have received a cache closed exception"); - } catch (RMIException e) { - if (!(e.getCause() instanceof DistributedSystemDisconnectedException)) { - throw e; - } - } - - assertEquals(Collections.singleton(3), getBucketList(vm1)); - - // This shouldn't hang, because the bucket creation should finish,. - putData(vm1, 3, 4, "a"); - } - - private void createPR(VM vm0, final int redundancy) { - vm0.invoke(new SerializableRunnable("Create PR") { - - public void run() { - Cache cache = getCache(); - AttributesFactory af = new AttributesFactory(); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); - paf.setRedundantCopies(redundancy); - af.setPartitionAttributes(paf.create()); - af.setDataPolicy(DataPolicy.PARTITION); - cache.createRegion("region", af.create()); - } - - }); - } - - protected void putData(VM vm, final int startKey, final int endKey, final String value) { - SerializableRunnable createData = new SerializableRunnable() { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion("region"); - - for (int i = startKey; i < endKey; i++) { - region.put(i, value); - } - } - }; - vm.invoke(createData); - } - - protected Set<Integer> getBucketList(VM vm0) { - return getBucketList(vm0, "region"); - } - - protected Set<Integer> getBucketList(VM vm0, final String regionName) { - SerializableCallable getBuckets = new SerializableCallable("get buckets") { - - public Object call() throws Exception { - Cache cache = getCache(); - PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); - return new TreeSet<Integer>(region.getDataStore().getAllLocalBucketIds()); - } - }; - - return (Set<Integer>) vm0.invoke(getBuckets); - } - -} -- To stop receiving notification emails like this one, please contact [email protected].
