This is an automated email from the ASF dual-hosted git repository. dschneider pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 9614953 GEODE-3647: Fix race condition 9614953 is described below commit 96149530d82e0b62e9df1a043bfd7d0e01d3411a Author: Nick Reich <nre...@pivotal.io> AuthorDate: Mon Sep 18 14:39:51 2017 -0700 GEODE-3647: Fix race condition Partitioned region attributes mutation can fail to be applied to buckets created concurrently. Preventing bucket creation during the mutation of attributes solves this issue. --- .../geode/internal/cache/PartitionedRegion.java | 49 ++--- .../internal/cache/PartitionedRegionDataStore.java | 14 +- .../PartitionedRegionAttributesMutatorTest.java | 211 +++++++++++++++++++++ .../cache/PartitionedRegionDataStoreJUnitTest.java | 63 ------ 4 files changed, 243 insertions(+), 94 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 0ada1f9..0a0b802 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -100,6 +100,7 @@ import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.cache.partition.PartitionListener; import org.apache.geode.cache.partition.PartitionNotAvailableException; +import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.cache.query.FunctionDomainException; @@ -9150,14 +9151,14 @@ public class PartitionedRegion extends LocalRegion @Override public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes timeToLive) { ExpirationAttributes attr = super.setEntryTimeToLive(timeToLive); - // Set to Bucket regions as well - if (this.getDataStore() != null) { // not for accessors - for (Object o : this.getDataStore().getAllLocalBuckets()) { - Map.Entry entry = (Map.Entry) o; - Region bucketRegion = (Region) entry.getValue(); - bucketRegion.getAttributesMutator().setEntryTimeToLive(timeToLive); - } - } + + /* + * All buckets must be created to make this change, otherwise it is possible for + * updatePRConfig(...) to make changes that cause bucket creation to live lock + */ + PartitionRegionHelper.assignBucketsToPartitions(this); + dataStore.lockBucketCreationAndVisit( + (bucketId, r) -> r.getAttributesMutator().setEntryTimeToLive(timeToLive)); updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false); return attr; } @@ -9181,13 +9182,8 @@ public class PartitionedRegion extends LocalRegion public CustomExpiry setCustomEntryTimeToLive(CustomExpiry custom) { CustomExpiry expiry = super.setCustomEntryTimeToLive(custom); // Set to Bucket regions as well - if (this.getDataStore() != null) { // not for accessors - for (Object o : this.getDataStore().getAllLocalBuckets()) { - Map.Entry entry = (Map.Entry) o; - Region bucketRegion = (Region) entry.getValue(); - bucketRegion.getAttributesMutator().setCustomEntryTimeToLive(custom); - } - } + dataStore.lockBucketCreationAndVisit( + (bucketId, r) -> r.getAttributesMutator().setCustomEntryTimeToLive(custom)); return expiry; } @@ -9206,14 +9202,14 @@ public class PartitionedRegion extends LocalRegion @Override public ExpirationAttributes setEntryIdleTimeout(ExpirationAttributes idleTimeout) { ExpirationAttributes attr = super.setEntryIdleTimeout(idleTimeout); + /* + * All buckets must be created to make this change, otherwise it is possible for + * updatePRConfig(...) to make changes that cause bucket creation to live lock + */ + PartitionRegionHelper.assignBucketsToPartitions(this); // Set to Bucket regions as well - if (this.getDataStore() != null) { // not for accessors - for (Object o : this.getDataStore().getAllLocalBuckets()) { - Map.Entry entry = (Map.Entry) o; - Region bucketRegion = (Region) entry.getValue(); - bucketRegion.getAttributesMutator().setEntryIdleTimeout(idleTimeout); - } - } + dataStore.lockBucketCreationAndVisit( + (bucketId, r) -> r.getAttributesMutator().setEntryIdleTimeout(idleTimeout)); updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false); return attr; } @@ -9228,13 +9224,8 @@ public class PartitionedRegion extends LocalRegion public CustomExpiry setCustomEntryIdleTimeout(CustomExpiry custom) { CustomExpiry expiry = super.setCustomEntryIdleTimeout(custom); // Set to Bucket regions as well - if (this.getDataStore() != null) { // not for accessors - for (Object o : this.getDataStore().getAllLocalBuckets()) { - Map.Entry entry = (Map.Entry) o; - Region bucketRegion = (Region) entry.getValue(); - bucketRegion.getAttributesMutator().setCustomEntryIdleTimeout(custom); - } - } + dataStore.lockBucketCreationAndVisit( + (bucketId, r) -> r.getAttributesMutator().setCustomEntryIdleTimeout(custom)); return expiry; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index dc91f0f..91d230e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -953,6 +953,16 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { } } + protected void lockBucketCreationAndVisit(BucketVisitor visitor) { + StoppableWriteLock lock = this.bucketCreationLock.writeLock(); + lock.lock(); + try { + visitBuckets(visitor); + } finally { + lock.unlock(); + } + } + /** * Gets the total amount of memory in bytes allocated for all values for this PR in this VM. This * is the current memory (MB) watermark for data in this PR. @@ -2476,8 +2486,8 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { * Interface for visiting buckets */ // public visibility for tests - public static abstract class BucketVisitor { - abstract public void visit(Integer bucketId, Region r); + public interface BucketVisitor { + void visit(Integer bucketId, Region r); } // public visibility for tests diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java new file mode 100644 index 0000000..4e2e965 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java @@ -0,0 +1,211 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license * + * agreements. See the NOTICE file distributed with this work for additional information regarding * + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance with the License. You may obtain a * + * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by + * applicable law or agreed to in writing, software distributed under the License * is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. + * See the License for the specific language governing permissions and limitations under * the + * License. * + * + */ + +package org.apache.geode.internal.cache; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.CacheLoader; +import org.apache.geode.cache.CacheLoaderException; +import org.apache.geode.cache.CustomExpiry; +import org.apache.geode.cache.ExpirationAction; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.LoaderHelper; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter; +import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder; +import org.apache.geode.test.dunit.rules.ServerStarterRule; +import org.apache.geode.test.junit.categories.IntegrationTest; + +@Category(IntegrationTest.class) +public class PartitionedRegionAttributesMutatorTest { + private static final String TEST_REGION_NAME = "testRegion"; + private static final int DEFAULT_WAIT_DURATION = 5; + private static final TimeUnit DEFAULT_WAIT_UNIT = TimeUnit.SECONDS; + + @Rule + public ServerStarterRule server = new ServerStarterRule().withAutoStart(); + + private final CountDownLatch mutationMade = new CountDownLatch(1); + private final CountDownLatch bucketCreated = new CountDownLatch(1); + private PartitionedRegion pr; + + @Test + public void testChangeCacheLoaderDuringBucketCreation() + throws InterruptedException, TimeoutException, ExecutionException { + createRegion(); + CacheLoader loader = createTestCacheLoader(); + + CompletableFuture<Void> createBucket = + CompletableFuture.runAsync(() -> PartitionRegionHelper.assignBucketsToPartitions(pr)); + bucketCreated.await(); + pr.getAttributesMutator().setCacheLoader(loader); + mutationMade.countDown(); + createBucket.get(DEFAULT_WAIT_DURATION, DEFAULT_WAIT_UNIT); + + getAllBucketRegions(pr).forEach(region -> assertEquals(loader, region.getCacheLoader())); + } + + @Test + public void testChangeCustomEntryTtlDuringBucketCreation() + throws InterruptedException, ExecutionException { + createRegion(); + CustomExpiry customExpiry = createTestCustomExpiry(); + + CompletableFuture<Void> createBucket = + CompletableFuture.runAsync(() -> PartitionRegionHelper.assignBucketsToPartitions(pr)); + bucketCreated.await(); + pr.getAttributesMutator().setCustomEntryTimeToLive(customExpiry); + mutationMade.countDown(); + createBucket.get(); + + getAllBucketRegions(pr) + .forEach(region -> assertEquals(customExpiry, region.customEntryTimeToLive)); + } + + @Test + public void testChangeCustomEntryIdleTimeoutDuringBucketCreation() + throws InterruptedException, ExecutionException { + createRegion(); + CustomExpiry customExpiry = createTestCustomExpiry(); + + CompletableFuture<Void> createBucket = + CompletableFuture.runAsync(() -> PartitionRegionHelper.assignBucketsToPartitions(pr)); + bucketCreated.await(); + pr.getAttributesMutator().setCustomEntryIdleTimeout(customExpiry); + mutationMade.countDown(); + createBucket.get(); + + getAllBucketRegions(pr) + .forEach(region -> assertEquals(customExpiry, region.customEntryIdleTimeout)); + } + + @Test + public void testChangeEntryIdleTimeoutDuringBucketCreation() + throws InterruptedException, ExecutionException { + createRegionWithFewBuckets(); + + CompletableFuture<Void> createBucket = + CompletableFuture.runAsync(() -> PartitionRegionHelper.assignBucketsToPartitions(pr)); + bucketCreated.await(); + ExpirationAttributes expirationAttributes = + new ExpirationAttributes(1000, ExpirationAction.DESTROY); + pr.getAttributesMutator().setEntryIdleTimeout(expirationAttributes); + mutationMade.countDown(); + createBucket.get(); + + getAllBucketRegions(pr) + .forEach(region -> assertEquals(expirationAttributes, region.getEntryIdleTimeout())); + } + + @Test + public void testChangeEntryTtlDuringBucketCreation() + throws InterruptedException, ExecutionException { + createRegionWithFewBuckets(); + + CompletableFuture<Void> createBucket = + CompletableFuture.runAsync(() -> PartitionRegionHelper.assignBucketsToPartitions(pr)); + bucketCreated.await(); + ExpirationAttributes expirationAttributes = + new ExpirationAttributes(1000, ExpirationAction.DESTROY); + pr.getAttributesMutator().setEntryTimeToLive(expirationAttributes); + mutationMade.countDown(); + createBucket.get(); + + getAllBucketRegions(pr) + .forEach(region -> assertEquals(expirationAttributes, region.getEntryTimeToLive())); + } + + private void createRegion() { + pr = (PartitionedRegion) server.getCache().createRegionFactory(RegionShortcut.PARTITION) + .setStatisticsEnabled(true).create(TEST_REGION_NAME); + setRegionObserver(); + } + + private void createRegionWithFewBuckets() { + PartitionAttributes partitionAttributes = + new PartitionAttributesFactory().setTotalNumBuckets(5).create(); + pr = (PartitionedRegion) server.getCache().createRegionFactory(RegionShortcut.PARTITION) + .setStatisticsEnabled(true).setPartitionAttributes(partitionAttributes) + .create(TEST_REGION_NAME); + setRegionObserver(); + } + + // Adds an observer which will block bucket creation and wait for a loader to be added + private void setRegionObserver() { + PartitionedRegionObserverHolder.setInstance(new PartitionedRegionObserverAdapter() { + @Override + public void beforeAssignBucket(PartitionedRegion partitionedRegion, int bucketId) { + try { + // Indicate that the bucket has been created + bucketCreated.countDown(); + + // Wait for the loader to be added. if the synchronization + // is correct, this would wait for ever because setting the + // cache loader will wait for this method. So time out after + // 1 second, which should be good enough to cause a failure + // if the synchronization is broken. + mutationMade.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted"); + } + } + }); + } + + private Set<BucketRegion> getAllBucketRegions(PartitionedRegion pr) { + return pr.getDataStore().getAllLocalBucketRegions(); + } + + private CacheLoader createTestCacheLoader() { + return new CacheLoader() { + @Override + public void close() {} + + @Override + public Object load(LoaderHelper helper) throws CacheLoaderException { + return null; + } + }; + } + + private CustomExpiry createTestCustomExpiry() { + return new CustomExpiry() { + @Override + public ExpirationAttributes getExpiry(Region.Entry entry) { + return null; + } + + @Override + public void close() { + + } + }; + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java index cd0917e..d9396da 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java @@ -15,10 +15,7 @@ package org.apache.geode.internal.cache; import org.apache.geode.cache.*; -import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter; -import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder; import org.apache.geode.test.junit.categories.IntegrationTest; import org.junit.After; import org.junit.Before; @@ -26,8 +23,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.junit.Assert.*; @@ -108,64 +103,6 @@ public class PartitionedRegionDataStoreJUnitTest { } - @Test - public void testChangeCacheLoaderDuringBucketCreation() throws Exception { - final PartitionedRegion pr = - (PartitionedRegion) cache.createRegionFactory(RegionShortcut.PARTITION) - .create("testChangeCacheLoaderDuringBucketCreation"); - - // Add an observer which will block bucket creation and wait for a loader to be added - final CountDownLatch loaderAdded = new CountDownLatch(1); - final CountDownLatch bucketCreated = new CountDownLatch(1); - PartitionedRegionObserverHolder.setInstance(new PartitionedRegionObserverAdapter() { - @Override - public void beforeAssignBucket(PartitionedRegion partitionedRegion, int bucketId) { - try { - // Indicate that the bucket has been created - bucketCreated.countDown(); - - // Wait for the loader to be added. if the synchronization - // is correct, this would wait for ever because setting the - // cache loader will wait for this method. So time out after - // 1 second, which should be good enough to cause a failure - // if the synchronization is broken. - loaderAdded.await(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted"); - } - } - }); - - Thread createBuckets = new Thread() { - public void run() { - PartitionRegionHelper.assignBucketsToPartitions(pr); - } - }; - - createBuckets.start(); - - CacheLoader loader = new CacheLoader() { - @Override - public void close() {} - - @Override - public Object load(LoaderHelper helper) throws CacheLoaderException { - return null; - } - }; - - bucketCreated.await(); - pr.getAttributesMutator().setCacheLoader(loader); - loaderAdded.countDown(); - createBuckets.join(); - - - // Assert that all buckets have received the cache loader - for (BucketRegion bucket : pr.getDataStore().getAllLocalBucketRegions()) { - assertEquals(loader, bucket.getCacheLoader()); - } - } - /** * This method checks whether the canAccomodateMoreBytesSafely returns false after reaching the * localMax memory. -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" <commits@geode.apache.org>'].