This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 201cc52 Feature/geode 3647 (#852)
201cc52 is described below
commit 201cc525b757301a677a94f4a2adcd0eb2cbe256
Author: Nick Reich <[email protected]>
AuthorDate: Mon Oct 2 14:50:50 2017 -0700
Feature/geode 3647 (#852)
GEODE-3647: Fix race condition
This reverts commit 5cb2a591199845190751f52e4da758c5ccd4d44e.
Fix change to import that caused previous commit to fail.
---
.../geode/internal/cache/PartitionedRegion.java | 49 ++---
.../internal/cache/PartitionedRegionDataStore.java | 14 +-
.../PartitionedRegionAttributesMutatorTest.java | 211 +++++++++++++++++++++
.../cache/PartitionedRegionDataStoreJUnitTest.java | 63 ------
4 files changed, 243 insertions(+), 94 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 0ada1f9..0a0b802 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -100,6 +100,7 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.cache.query.FunctionDomainException;
@@ -9150,14 +9151,14 @@ public class PartitionedRegion extends LocalRegion
@Override
public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes
timeToLive) {
ExpirationAttributes attr = super.setEntryTimeToLive(timeToLive);
- // Set to Bucket regions as well
- if (this.getDataStore() != null) { // not for accessors
- for (Object o : this.getDataStore().getAllLocalBuckets()) {
- Map.Entry entry = (Map.Entry) o;
- Region bucketRegion = (Region) entry.getValue();
- bucketRegion.getAttributesMutator().setEntryTimeToLive(timeToLive);
- }
- }
+
+ /*
+ * All buckets must be created to make this change, otherwise it is
possible for
+ * updatePRConfig(...) to make changes that cause bucket creation to live
lock
+ */
+ PartitionRegionHelper.assignBucketsToPartitions(this);
+ dataStore.lockBucketCreationAndVisit(
+ (bucketId, r) ->
r.getAttributesMutator().setEntryTimeToLive(timeToLive));
updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
return attr;
}
@@ -9181,13 +9182,8 @@ public class PartitionedRegion extends LocalRegion
public CustomExpiry setCustomEntryTimeToLive(CustomExpiry custom) {
CustomExpiry expiry = super.setCustomEntryTimeToLive(custom);
// Set to Bucket regions as well
- if (this.getDataStore() != null) { // not for accessors
- for (Object o : this.getDataStore().getAllLocalBuckets()) {
- Map.Entry entry = (Map.Entry) o;
- Region bucketRegion = (Region) entry.getValue();
- bucketRegion.getAttributesMutator().setCustomEntryTimeToLive(custom);
- }
- }
+ dataStore.lockBucketCreationAndVisit(
+ (bucketId, r) ->
r.getAttributesMutator().setCustomEntryTimeToLive(custom));
return expiry;
}
@@ -9206,14 +9202,14 @@ public class PartitionedRegion extends LocalRegion
@Override
public ExpirationAttributes setEntryIdleTimeout(ExpirationAttributes
idleTimeout) {
ExpirationAttributes attr = super.setEntryIdleTimeout(idleTimeout);
+ /*
+ * All buckets must be created to make this change, otherwise it is
possible for
+ * updatePRConfig(...) to make changes that cause bucket creation to live
lock
+ */
+ PartitionRegionHelper.assignBucketsToPartitions(this);
// Set to Bucket regions as well
- if (this.getDataStore() != null) { // not for accessors
- for (Object o : this.getDataStore().getAllLocalBuckets()) {
- Map.Entry entry = (Map.Entry) o;
- Region bucketRegion = (Region) entry.getValue();
- bucketRegion.getAttributesMutator().setEntryIdleTimeout(idleTimeout);
- }
- }
+ dataStore.lockBucketCreationAndVisit(
+ (bucketId, r) ->
r.getAttributesMutator().setEntryIdleTimeout(idleTimeout));
updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
return attr;
}
@@ -9228,13 +9224,8 @@ public class PartitionedRegion extends LocalRegion
public CustomExpiry setCustomEntryIdleTimeout(CustomExpiry custom) {
CustomExpiry expiry = super.setCustomEntryIdleTimeout(custom);
// Set to Bucket regions as well
- if (this.getDataStore() != null) { // not for accessors
- for (Object o : this.getDataStore().getAllLocalBuckets()) {
- Map.Entry entry = (Map.Entry) o;
- Region bucketRegion = (Region) entry.getValue();
- bucketRegion.getAttributesMutator().setCustomEntryIdleTimeout(custom);
- }
- }
+ dataStore.lockBucketCreationAndVisit(
+ (bucketId, r) ->
r.getAttributesMutator().setCustomEntryIdleTimeout(custom));
return expiry;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index dc91f0f..91d230e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -953,6 +953,16 @@ public class PartitionedRegionDataStore implements
HasCachePerfStats {
}
}
+ protected void lockBucketCreationAndVisit(BucketVisitor visitor) {
+ StoppableWriteLock lock = this.bucketCreationLock.writeLock();
+ lock.lock();
+ try {
+ visitBuckets(visitor);
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* Gets the total amount of memory in bytes allocated for all values for
this PR in this VM. This
* is the current memory (MB) watermark for data in this PR.
@@ -2476,8 +2486,8 @@ public class PartitionedRegionDataStore implements
HasCachePerfStats {
* Interface for visiting buckets
*/
// public visibility for tests
- public static abstract class BucketVisitor {
- abstract public void visit(Integer bucketId, Region r);
+ public interface BucketVisitor {
+ void visit(Integer bucketId, Region r);
}
// public visibility for tests
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
new file mode 100644
index 0000000..e6cb4ee
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
@@ -0,0 +1,211 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
+ * applicable law or agreed to in writing, software distributed under the
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
+ * See the License for the specific language governing permissions and
limitations under * the
+ * License. *
+ *
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CustomExpiry;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
+import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+@Category(IntegrationTest.class)
+public class PartitionedRegionAttributesMutatorTest {
+ private static final String TEST_REGION_NAME = "testRegion";
+ private static final int DEFAULT_WAIT_DURATION = 5;
+ private static final TimeUnit DEFAULT_WAIT_UNIT = TimeUnit.SECONDS;
+
+ @Rule
+ public ServerStarterRule server = new ServerStarterRule().withAutoStart();
+
+ private final CountDownLatch mutationMade = new CountDownLatch(1);
+ private final CountDownLatch bucketCreated = new CountDownLatch(1);
+ private PartitionedRegion pr;
+
+ @Test
+ public void testChangeCacheLoaderDuringBucketCreation()
+ throws InterruptedException, TimeoutException, ExecutionException {
+ createRegion();
+ CacheLoader loader = createTestCacheLoader();
+
+ CompletableFuture<Void> createBucket =
+ CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
+ bucketCreated.await();
+ pr.getAttributesMutator().setCacheLoader(loader);
+ mutationMade.countDown();
+ createBucket.get(DEFAULT_WAIT_DURATION, DEFAULT_WAIT_UNIT);
+
+ getAllBucketRegions(pr).forEach(region -> assertEquals(loader,
region.getCacheLoader()));
+ }
+
+ @Test
+ public void testChangeCustomEntryTtlDuringBucketCreation()
+ throws InterruptedException, ExecutionException {
+ createRegion();
+ CustomExpiry customExpiry = createTestCustomExpiry();
+
+ CompletableFuture<Void> createBucket =
+ CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
+ bucketCreated.await();
+ pr.getAttributesMutator().setCustomEntryTimeToLive(customExpiry);
+ mutationMade.countDown();
+ createBucket.get();
+
+ getAllBucketRegions(pr)
+ .forEach(region -> assertEquals(customExpiry,
region.customEntryTimeToLive));
+ }
+
+ @Test
+ public void testChangeCustomEntryIdleTimeoutDuringBucketCreation()
+ throws InterruptedException, ExecutionException {
+ createRegion();
+ CustomExpiry customExpiry = createTestCustomExpiry();
+
+ CompletableFuture<Void> createBucket =
+ CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
+ bucketCreated.await();
+ pr.getAttributesMutator().setCustomEntryIdleTimeout(customExpiry);
+ mutationMade.countDown();
+ createBucket.get();
+
+ getAllBucketRegions(pr)
+ .forEach(region -> assertEquals(customExpiry,
region.customEntryIdleTimeout));
+ }
+
+ @Test
+ public void testChangeEntryIdleTimeoutDuringBucketCreation()
+ throws InterruptedException, ExecutionException {
+ createRegionWithFewBuckets();
+
+ CompletableFuture<Void> createBucket =
+ CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
+ bucketCreated.await();
+ ExpirationAttributes expirationAttributes =
+ new ExpirationAttributes(1000, ExpirationAction.DESTROY);
+ pr.getAttributesMutator().setEntryIdleTimeout(expirationAttributes);
+ mutationMade.countDown();
+ createBucket.get();
+
+ getAllBucketRegions(pr)
+ .forEach(region -> assertEquals(expirationAttributes,
region.getEntryIdleTimeout()));
+ }
+
+ @Test
+ public void testChangeEntryTtlDuringBucketCreation()
+ throws InterruptedException, ExecutionException {
+ createRegionWithFewBuckets();
+
+ CompletableFuture<Void> createBucket =
+ CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
+ bucketCreated.await();
+ ExpirationAttributes expirationAttributes =
+ new ExpirationAttributes(1000, ExpirationAction.DESTROY);
+ pr.getAttributesMutator().setEntryTimeToLive(expirationAttributes);
+ mutationMade.countDown();
+ createBucket.get();
+
+ getAllBucketRegions(pr)
+ .forEach(region -> assertEquals(expirationAttributes,
region.getEntryTimeToLive()));
+ }
+
+ private void createRegion() {
+ pr = (PartitionedRegion)
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setStatisticsEnabled(true).create(TEST_REGION_NAME);
+ setRegionObserver();
+ }
+
+ private void createRegionWithFewBuckets() {
+ PartitionAttributes partitionAttributes =
+ new PartitionAttributesFactory().setTotalNumBuckets(5).create();
+ pr = (PartitionedRegion)
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setStatisticsEnabled(true).setPartitionAttributes(partitionAttributes)
+ .create(TEST_REGION_NAME);
+ setRegionObserver();
+ }
+
+ // Adds an observer which will block bucket creation and wait for a loader
to be added
+ private void setRegionObserver() {
+ PartitionedRegionObserverHolder.setInstance(new
PartitionedRegionObserverAdapter() {
+ @Override
+ public void beforeAssignBucket(PartitionedRegion partitionedRegion, int
bucketId) {
+ try {
+ // Indicate that the bucket has been created
+ bucketCreated.countDown();
+
+ // Wait for the loader to be added. if the synchronization
+ // is correct, this would wait for ever because setting the
+ // cache loader will wait for this method. So time out after
+ // 1 second, which should be good enough to cause a failure
+ // if the synchronization is broken.
+ mutationMade.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted");
+ }
+ }
+ });
+ }
+
+ private Set<BucketRegion> getAllBucketRegions(PartitionedRegion pr) {
+ return pr.getDataStore().getAllLocalBucketRegions();
+ }
+
+ private CacheLoader createTestCacheLoader() {
+ return new CacheLoader() {
+ @Override
+ public void close() {}
+
+ @Override
+ public Object load(LoaderHelper helper) throws CacheLoaderException {
+ return null;
+ }
+ };
+ }
+
+ private CustomExpiry createTestCustomExpiry() {
+ return new CustomExpiry() {
+ @Override
+ public ExpirationAttributes getExpiry(Region.Entry entry) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
index cd0917e..d9396da 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
@@ -15,10 +15,7 @@
package org.apache.geode.internal.cache;
import org.apache.geode.cache.*;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedSystem;
-import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
-import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
import org.apache.geode.test.junit.categories.IntegrationTest;
import org.junit.After;
import org.junit.Before;
@@ -26,8 +23,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.*;
@@ -108,64 +103,6 @@ public class PartitionedRegionDataStoreJUnitTest {
}
- @Test
- public void testChangeCacheLoaderDuringBucketCreation() throws Exception {
- final PartitionedRegion pr =
- (PartitionedRegion) cache.createRegionFactory(RegionShortcut.PARTITION)
- .create("testChangeCacheLoaderDuringBucketCreation");
-
- // Add an observer which will block bucket creation and wait for a loader
to be added
- final CountDownLatch loaderAdded = new CountDownLatch(1);
- final CountDownLatch bucketCreated = new CountDownLatch(1);
- PartitionedRegionObserverHolder.setInstance(new
PartitionedRegionObserverAdapter() {
- @Override
- public void beforeAssignBucket(PartitionedRegion partitionedRegion, int
bucketId) {
- try {
- // Indicate that the bucket has been created
- bucketCreated.countDown();
-
- // Wait for the loader to be added. if the synchronization
- // is correct, this would wait for ever because setting the
- // cache loader will wait for this method. So time out after
- // 1 second, which should be good enough to cause a failure
- // if the synchronization is broken.
- loaderAdded.await(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted");
- }
- }
- });
-
- Thread createBuckets = new Thread() {
- public void run() {
- PartitionRegionHelper.assignBucketsToPartitions(pr);
- }
- };
-
- createBuckets.start();
-
- CacheLoader loader = new CacheLoader() {
- @Override
- public void close() {}
-
- @Override
- public Object load(LoaderHelper helper) throws CacheLoaderException {
- return null;
- }
- };
-
- bucketCreated.await();
- pr.getAttributesMutator().setCacheLoader(loader);
- loaderAdded.countDown();
- createBuckets.join();
-
-
- // Assert that all buckets have received the cache loader
- for (BucketRegion bucket : pr.getDataStore().getAllLocalBucketRegions()) {
- assertEquals(loader, bucket.getCacheLoader());
- }
- }
-
/**
* This method checks whether the canAccomodateMoreBytesSafely returns false
after reaching the
* localMax memory.
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].