This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5cb2a59 Revert "GEODE-3647: Fix race condition"
5cb2a59 is described below
commit 5cb2a591199845190751f52e4da758c5ccd4d44e
Author: Darrel Schneider <[email protected]>
AuthorDate: Mon Oct 2 13:46:07 2017 -0700
Revert "GEODE-3647: Fix race condition"
This reverts commit 96149530d82e0b62e9df1a043bfd7d0e01d3411a.
---
.../geode/internal/cache/PartitionedRegion.java | 49 +++--
.../internal/cache/PartitionedRegionDataStore.java | 14 +-
.../PartitionedRegionAttributesMutatorTest.java | 211 ---------------------
.../cache/PartitionedRegionDataStoreJUnitTest.java | 63 ++++++
4 files changed, 94 insertions(+), 243 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 0a0b802..0ada1f9 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -100,7 +100,6 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.cache.query.FunctionDomainException;
@@ -9151,14 +9150,14 @@ public class PartitionedRegion extends LocalRegion
@Override
public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes
timeToLive) {
ExpirationAttributes attr = super.setEntryTimeToLive(timeToLive);
-
- /*
- * All buckets must be created to make this change, otherwise it is
possible for
- * updatePRConfig(...) to make changes that cause bucket creation to live
lock
- */
- PartitionRegionHelper.assignBucketsToPartitions(this);
- dataStore.lockBucketCreationAndVisit(
- (bucketId, r) ->
r.getAttributesMutator().setEntryTimeToLive(timeToLive));
+ // Set to Bucket regions as well
+ if (this.getDataStore() != null) { // not for accessors
+ for (Object o : this.getDataStore().getAllLocalBuckets()) {
+ Map.Entry entry = (Map.Entry) o;
+ Region bucketRegion = (Region) entry.getValue();
+ bucketRegion.getAttributesMutator().setEntryTimeToLive(timeToLive);
+ }
+ }
updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
return attr;
}
@@ -9182,8 +9181,13 @@ public class PartitionedRegion extends LocalRegion
public CustomExpiry setCustomEntryTimeToLive(CustomExpiry custom) {
CustomExpiry expiry = super.setCustomEntryTimeToLive(custom);
// Set to Bucket regions as well
- dataStore.lockBucketCreationAndVisit(
- (bucketId, r) ->
r.getAttributesMutator().setCustomEntryTimeToLive(custom));
+ if (this.getDataStore() != null) { // not for accessors
+ for (Object o : this.getDataStore().getAllLocalBuckets()) {
+ Map.Entry entry = (Map.Entry) o;
+ Region bucketRegion = (Region) entry.getValue();
+ bucketRegion.getAttributesMutator().setCustomEntryTimeToLive(custom);
+ }
+ }
return expiry;
}
@@ -9202,14 +9206,14 @@ public class PartitionedRegion extends LocalRegion
@Override
public ExpirationAttributes setEntryIdleTimeout(ExpirationAttributes
idleTimeout) {
ExpirationAttributes attr = super.setEntryIdleTimeout(idleTimeout);
- /*
- * All buckets must be created to make this change, otherwise it is
possible for
- * updatePRConfig(...) to make changes that cause bucket creation to live
lock
- */
- PartitionRegionHelper.assignBucketsToPartitions(this);
// Set to Bucket regions as well
- dataStore.lockBucketCreationAndVisit(
- (bucketId, r) ->
r.getAttributesMutator().setEntryIdleTimeout(idleTimeout));
+ if (this.getDataStore() != null) { // not for accessors
+ for (Object o : this.getDataStore().getAllLocalBuckets()) {
+ Map.Entry entry = (Map.Entry) o;
+ Region bucketRegion = (Region) entry.getValue();
+ bucketRegion.getAttributesMutator().setEntryIdleTimeout(idleTimeout);
+ }
+ }
updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
return attr;
}
@@ -9224,8 +9228,13 @@ public class PartitionedRegion extends LocalRegion
public CustomExpiry setCustomEntryIdleTimeout(CustomExpiry custom) {
CustomExpiry expiry = super.setCustomEntryIdleTimeout(custom);
// Set to Bucket regions as well
- dataStore.lockBucketCreationAndVisit(
- (bucketId, r) ->
r.getAttributesMutator().setCustomEntryIdleTimeout(custom));
+ if (this.getDataStore() != null) { // not for accessors
+ for (Object o : this.getDataStore().getAllLocalBuckets()) {
+ Map.Entry entry = (Map.Entry) o;
+ Region bucketRegion = (Region) entry.getValue();
+ bucketRegion.getAttributesMutator().setCustomEntryIdleTimeout(custom);
+ }
+ }
return expiry;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 91d230e..dc91f0f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -953,16 +953,6 @@ public class PartitionedRegionDataStore implements
HasCachePerfStats {
}
}
- protected void lockBucketCreationAndVisit(BucketVisitor visitor) {
- StoppableWriteLock lock = this.bucketCreationLock.writeLock();
- lock.lock();
- try {
- visitBuckets(visitor);
- } finally {
- lock.unlock();
- }
- }
-
/**
* Gets the total amount of memory in bytes allocated for all values for
this PR in this VM. This
* is the current memory (MB) watermark for data in this PR.
@@ -2486,8 +2476,8 @@ public class PartitionedRegionDataStore implements
HasCachePerfStats {
* Interface for visiting buckets
*/
// public visibility for tests
- public interface BucketVisitor {
- void visit(Integer bucketId, Region r);
+ public static abstract class BucketVisitor {
+ abstract public void visit(Integer bucketId, Region r);
}
// public visibility for tests
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
deleted file mode 100644
index 4e2e965..0000000
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
- * agreements. See the NOTICE file distributed with this work for additional
information regarding *
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
- * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
- * applicable law or agreed to in writing, software distributed under the
License * is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
- * See the License for the specific language governing permissions and
limitations under * the
- * License. *
- *
- */
-
-package org.apache.geode.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheLoader;
-import org.apache.geode.cache.CacheLoaderException;
-import org.apache.geode.cache.CustomExpiry;
-import org.apache.geode.cache.ExpirationAction;
-import org.apache.geode.cache.ExpirationAttributes;
-import org.apache.geode.cache.LoaderHelper;
-import org.apache.geode.cache.PartitionAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
-import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
-import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
-import org.apache.geode.test.dunit.rules.ServerStarterRule;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-
-@Category(IntegrationTest.class)
-public class PartitionedRegionAttributesMutatorTest {
- private static final String TEST_REGION_NAME = "testRegion";
- private static final int DEFAULT_WAIT_DURATION = 5;
- private static final TimeUnit DEFAULT_WAIT_UNIT = TimeUnit.SECONDS;
-
- @Rule
- public ServerStarterRule server = new ServerStarterRule().withAutoStart();
-
- private final CountDownLatch mutationMade = new CountDownLatch(1);
- private final CountDownLatch bucketCreated = new CountDownLatch(1);
- private PartitionedRegion pr;
-
- @Test
- public void testChangeCacheLoaderDuringBucketCreation()
- throws InterruptedException, TimeoutException, ExecutionException {
- createRegion();
- CacheLoader loader = createTestCacheLoader();
-
- CompletableFuture<Void> createBucket =
- CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
- bucketCreated.await();
- pr.getAttributesMutator().setCacheLoader(loader);
- mutationMade.countDown();
- createBucket.get(DEFAULT_WAIT_DURATION, DEFAULT_WAIT_UNIT);
-
- getAllBucketRegions(pr).forEach(region -> assertEquals(loader,
region.getCacheLoader()));
- }
-
- @Test
- public void testChangeCustomEntryTtlDuringBucketCreation()
- throws InterruptedException, ExecutionException {
- createRegion();
- CustomExpiry customExpiry = createTestCustomExpiry();
-
- CompletableFuture<Void> createBucket =
- CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
- bucketCreated.await();
- pr.getAttributesMutator().setCustomEntryTimeToLive(customExpiry);
- mutationMade.countDown();
- createBucket.get();
-
- getAllBucketRegions(pr)
- .forEach(region -> assertEquals(customExpiry,
region.customEntryTimeToLive));
- }
-
- @Test
- public void testChangeCustomEntryIdleTimeoutDuringBucketCreation()
- throws InterruptedException, ExecutionException {
- createRegion();
- CustomExpiry customExpiry = createTestCustomExpiry();
-
- CompletableFuture<Void> createBucket =
- CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
- bucketCreated.await();
- pr.getAttributesMutator().setCustomEntryIdleTimeout(customExpiry);
- mutationMade.countDown();
- createBucket.get();
-
- getAllBucketRegions(pr)
- .forEach(region -> assertEquals(customExpiry,
region.customEntryIdleTimeout));
- }
-
- @Test
- public void testChangeEntryIdleTimeoutDuringBucketCreation()
- throws InterruptedException, ExecutionException {
- createRegionWithFewBuckets();
-
- CompletableFuture<Void> createBucket =
- CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
- bucketCreated.await();
- ExpirationAttributes expirationAttributes =
- new ExpirationAttributes(1000, ExpirationAction.DESTROY);
- pr.getAttributesMutator().setEntryIdleTimeout(expirationAttributes);
- mutationMade.countDown();
- createBucket.get();
-
- getAllBucketRegions(pr)
- .forEach(region -> assertEquals(expirationAttributes,
region.getEntryIdleTimeout()));
- }
-
- @Test
- public void testChangeEntryTtlDuringBucketCreation()
- throws InterruptedException, ExecutionException {
- createRegionWithFewBuckets();
-
- CompletableFuture<Void> createBucket =
- CompletableFuture.runAsync(() ->
PartitionRegionHelper.assignBucketsToPartitions(pr));
- bucketCreated.await();
- ExpirationAttributes expirationAttributes =
- new ExpirationAttributes(1000, ExpirationAction.DESTROY);
- pr.getAttributesMutator().setEntryTimeToLive(expirationAttributes);
- mutationMade.countDown();
- createBucket.get();
-
- getAllBucketRegions(pr)
- .forEach(region -> assertEquals(expirationAttributes,
region.getEntryTimeToLive()));
- }
-
- private void createRegion() {
- pr = (PartitionedRegion)
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
- .setStatisticsEnabled(true).create(TEST_REGION_NAME);
- setRegionObserver();
- }
-
- private void createRegionWithFewBuckets() {
- PartitionAttributes partitionAttributes =
- new PartitionAttributesFactory().setTotalNumBuckets(5).create();
- pr = (PartitionedRegion)
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
- .setStatisticsEnabled(true).setPartitionAttributes(partitionAttributes)
- .create(TEST_REGION_NAME);
- setRegionObserver();
- }
-
- // Adds an observer which will block bucket creation and wait for a loader
to be added
- private void setRegionObserver() {
- PartitionedRegionObserverHolder.setInstance(new
PartitionedRegionObserverAdapter() {
- @Override
- public void beforeAssignBucket(PartitionedRegion partitionedRegion, int
bucketId) {
- try {
- // Indicate that the bucket has been created
- bucketCreated.countDown();
-
- // Wait for the loader to be added. if the synchronization
- // is correct, this would wait for ever because setting the
- // cache loader will wait for this method. So time out after
- // 1 second, which should be good enough to cause a failure
- // if the synchronization is broken.
- mutationMade.await(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted");
- }
- }
- });
- }
-
- private Set<BucketRegion> getAllBucketRegions(PartitionedRegion pr) {
- return pr.getDataStore().getAllLocalBucketRegions();
- }
-
- private CacheLoader createTestCacheLoader() {
- return new CacheLoader() {
- @Override
- public void close() {}
-
- @Override
- public Object load(LoaderHelper helper) throws CacheLoaderException {
- return null;
- }
- };
- }
-
- private CustomExpiry createTestCustomExpiry() {
- return new CustomExpiry() {
- @Override
- public ExpirationAttributes getExpiry(Region.Entry entry) {
- return null;
- }
-
- @Override
- public void close() {
-
- }
- };
- }
-}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
index d9396da..cd0917e 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
@@ -15,7 +15,10 @@
package org.apache.geode.internal.cache;
import org.apache.geode.cache.*;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedSystem;
+import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
+import
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
import org.apache.geode.test.junit.categories.IntegrationTest;
import org.junit.After;
import org.junit.Before;
@@ -23,6 +26,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.*;
@@ -103,6 +108,64 @@ public class PartitionedRegionDataStoreJUnitTest {
}
+ @Test
+ public void testChangeCacheLoaderDuringBucketCreation() throws Exception {
+ final PartitionedRegion pr =
+ (PartitionedRegion) cache.createRegionFactory(RegionShortcut.PARTITION)
+ .create("testChangeCacheLoaderDuringBucketCreation");
+
+ // Add an observer which will block bucket creation and wait for a loader
to be added
+ final CountDownLatch loaderAdded = new CountDownLatch(1);
+ final CountDownLatch bucketCreated = new CountDownLatch(1);
+ PartitionedRegionObserverHolder.setInstance(new
PartitionedRegionObserverAdapter() {
+ @Override
+ public void beforeAssignBucket(PartitionedRegion partitionedRegion, int
bucketId) {
+ try {
+ // Indicate that the bucket has been created
+ bucketCreated.countDown();
+
+ // Wait for the loader to be added. if the synchronization
+ // is correct, this would wait for ever because setting the
+ // cache loader will wait for this method. So time out after
+ // 1 second, which should be good enough to cause a failure
+ // if the synchronization is broken.
+ loaderAdded.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted");
+ }
+ }
+ });
+
+ Thread createBuckets = new Thread() {
+ public void run() {
+ PartitionRegionHelper.assignBucketsToPartitions(pr);
+ }
+ };
+
+ createBuckets.start();
+
+ CacheLoader loader = new CacheLoader() {
+ @Override
+ public void close() {}
+
+ @Override
+ public Object load(LoaderHelper helper) throws CacheLoaderException {
+ return null;
+ }
+ };
+
+ bucketCreated.await();
+ pr.getAttributesMutator().setCacheLoader(loader);
+ loaderAdded.countDown();
+ createBuckets.join();
+
+
+ // Assert that all buckets have received the cache loader
+ for (BucketRegion bucket : pr.getDataStore().getAllLocalBucketRegions()) {
+ assertEquals(loader, bucket.getCacheLoader());
+ }
+ }
+
/**
* This method checks whether the canAccomodateMoreBytesSafely returns false
after reaching the
* localMax memory.
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].