This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9614953  GEODE-3647: Fix race condition
9614953 is described below

commit 96149530d82e0b62e9df1a043bfd7d0e01d3411a
Author: Nick Reich <nre...@pivotal.io>
AuthorDate: Mon Sep 18 14:39:51 2017 -0700

    GEODE-3647: Fix race condition
    
      Partitioned region attributes mutation can fail to be applied to
      buckets created concurrently. Preventing bucket creation during the
      mutation of attributes solves this issue.
---
 .../geode/internal/cache/PartitionedRegion.java    |  49 ++---
 .../internal/cache/PartitionedRegionDataStore.java |  14 +-
 .../PartitionedRegionAttributesMutatorTest.java    | 211 +++++++++++++++++++++
 .../cache/PartitionedRegionDataStoreJUnitTest.java |  63 ------
 4 files changed, 243 insertions(+), 94 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 0ada1f9..0a0b802 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -100,6 +100,7 @@ import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.partition.PartitionListener;
 import org.apache.geode.cache.partition.PartitionNotAvailableException;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.cache.query.FunctionDomainException;
@@ -9150,14 +9151,14 @@ public class PartitionedRegion extends LocalRegion
   @Override
   public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes 
timeToLive) {
     ExpirationAttributes attr = super.setEntryTimeToLive(timeToLive);
-    // Set to Bucket regions as well
-    if (this.getDataStore() != null) { // not for accessors
-      for (Object o : this.getDataStore().getAllLocalBuckets()) {
-        Map.Entry entry = (Map.Entry) o;
-        Region bucketRegion = (Region) entry.getValue();
-        bucketRegion.getAttributesMutator().setEntryTimeToLive(timeToLive);
-      }
-    }
+
+    /*
+     * All buckets must be created to make this change, otherwise it is 
possible for
+     * updatePRConfig(...) to make changes that cause bucket creation to live 
lock
+     */
+    PartitionRegionHelper.assignBucketsToPartitions(this);
+    dataStore.lockBucketCreationAndVisit(
+        (bucketId, r) -> 
r.getAttributesMutator().setEntryTimeToLive(timeToLive));
     updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
     return attr;
   }
@@ -9181,13 +9182,8 @@ public class PartitionedRegion extends LocalRegion
   public CustomExpiry setCustomEntryTimeToLive(CustomExpiry custom) {
     CustomExpiry expiry = super.setCustomEntryTimeToLive(custom);
     // Set to Bucket regions as well
-    if (this.getDataStore() != null) { // not for accessors
-      for (Object o : this.getDataStore().getAllLocalBuckets()) {
-        Map.Entry entry = (Map.Entry) o;
-        Region bucketRegion = (Region) entry.getValue();
-        bucketRegion.getAttributesMutator().setCustomEntryTimeToLive(custom);
-      }
-    }
+    dataStore.lockBucketCreationAndVisit(
+        (bucketId, r) -> 
r.getAttributesMutator().setCustomEntryTimeToLive(custom));
     return expiry;
   }
 
@@ -9206,14 +9202,14 @@ public class PartitionedRegion extends LocalRegion
   @Override
   public ExpirationAttributes setEntryIdleTimeout(ExpirationAttributes 
idleTimeout) {
     ExpirationAttributes attr = super.setEntryIdleTimeout(idleTimeout);
+    /*
+     * All buckets must be created to make this change, otherwise it is 
possible for
+     * updatePRConfig(...) to make changes that cause bucket creation to live 
lock
+     */
+    PartitionRegionHelper.assignBucketsToPartitions(this);
     // Set to Bucket regions as well
-    if (this.getDataStore() != null) { // not for accessors
-      for (Object o : this.getDataStore().getAllLocalBuckets()) {
-        Map.Entry entry = (Map.Entry) o;
-        Region bucketRegion = (Region) entry.getValue();
-        bucketRegion.getAttributesMutator().setEntryIdleTimeout(idleTimeout);
-      }
-    }
+    dataStore.lockBucketCreationAndVisit(
+        (bucketId, r) -> 
r.getAttributesMutator().setEntryIdleTimeout(idleTimeout));
     updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
     return attr;
   }
@@ -9228,13 +9224,8 @@ public class PartitionedRegion extends LocalRegion
   public CustomExpiry setCustomEntryIdleTimeout(CustomExpiry custom) {
     CustomExpiry expiry = super.setCustomEntryIdleTimeout(custom);
     // Set to Bucket regions as well
-    if (this.getDataStore() != null) { // not for accessors
-      for (Object o : this.getDataStore().getAllLocalBuckets()) {
-        Map.Entry entry = (Map.Entry) o;
-        Region bucketRegion = (Region) entry.getValue();
-        bucketRegion.getAttributesMutator().setCustomEntryIdleTimeout(custom);
-      }
-    }
+    dataStore.lockBucketCreationAndVisit(
+        (bucketId, r) -> 
r.getAttributesMutator().setCustomEntryIdleTimeout(custom));
     return expiry;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index dc91f0f..91d230e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -953,6 +953,16 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
     }
   }
 
+  protected void lockBucketCreationAndVisit(BucketVisitor visitor) {
+    StoppableWriteLock lock = this.bucketCreationLock.writeLock();
+    lock.lock();
+    try {
+      visitBuckets(visitor);
+    } finally {
+      lock.unlock();
+    }
+  }
+
   /**
    * Gets the total amount of memory in bytes allocated for all values for 
this PR in this VM. This
    * is the current memory (MB) watermark for data in this PR.
@@ -2476,8 +2486,8 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
    * Interface for visiting buckets
    */
   // public visibility for tests
-  public static abstract class BucketVisitor {
-    abstract public void visit(Integer bucketId, Region r);
+  public interface BucketVisitor {
+    void visit(Integer bucketId, Region r);
   }
 
   // public visibility for tests
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
new file mode 100644
index 0000000..4e2e965
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
@@ -0,0 +1,211 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the 
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * 
Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express * or implied.
+ * See the License for the specific language governing permissions and 
limitations under * the
+ * License. *
+ *
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CustomExpiry;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
+import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
+import org.apache.geode.test.dunit.rules.ServerStarterRule;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class PartitionedRegionAttributesMutatorTest {
+  private static final String TEST_REGION_NAME = "testRegion";
+  private static final int DEFAULT_WAIT_DURATION = 5;
+  private static final TimeUnit DEFAULT_WAIT_UNIT = TimeUnit.SECONDS;
+
+  @Rule
+  public ServerStarterRule server = new ServerStarterRule().withAutoStart();
+
+  private final CountDownLatch mutationMade = new CountDownLatch(1);
+  private final CountDownLatch bucketCreated = new CountDownLatch(1);
+  private PartitionedRegion pr;
+
+  @Test
+  public void testChangeCacheLoaderDuringBucketCreation()
+      throws InterruptedException, TimeoutException, ExecutionException {
+    createRegion();
+    CacheLoader loader = createTestCacheLoader();
+
+    CompletableFuture<Void> createBucket =
+        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
+    bucketCreated.await();
+    pr.getAttributesMutator().setCacheLoader(loader);
+    mutationMade.countDown();
+    createBucket.get(DEFAULT_WAIT_DURATION, DEFAULT_WAIT_UNIT);
+
+    getAllBucketRegions(pr).forEach(region -> assertEquals(loader, 
region.getCacheLoader()));
+  }
+
+  @Test
+  public void testChangeCustomEntryTtlDuringBucketCreation()
+      throws InterruptedException, ExecutionException {
+    createRegion();
+    CustomExpiry customExpiry = createTestCustomExpiry();
+
+    CompletableFuture<Void> createBucket =
+        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
+    bucketCreated.await();
+    pr.getAttributesMutator().setCustomEntryTimeToLive(customExpiry);
+    mutationMade.countDown();
+    createBucket.get();
+
+    getAllBucketRegions(pr)
+        .forEach(region -> assertEquals(customExpiry, 
region.customEntryTimeToLive));
+  }
+
+  @Test
+  public void testChangeCustomEntryIdleTimeoutDuringBucketCreation()
+      throws InterruptedException, ExecutionException {
+    createRegion();
+    CustomExpiry customExpiry = createTestCustomExpiry();
+
+    CompletableFuture<Void> createBucket =
+        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
+    bucketCreated.await();
+    pr.getAttributesMutator().setCustomEntryIdleTimeout(customExpiry);
+    mutationMade.countDown();
+    createBucket.get();
+
+    getAllBucketRegions(pr)
+        .forEach(region -> assertEquals(customExpiry, 
region.customEntryIdleTimeout));
+  }
+
+  @Test
+  public void testChangeEntryIdleTimeoutDuringBucketCreation()
+      throws InterruptedException, ExecutionException {
+    createRegionWithFewBuckets();
+
+    CompletableFuture<Void> createBucket =
+        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
+    bucketCreated.await();
+    ExpirationAttributes expirationAttributes =
+        new ExpirationAttributes(1000, ExpirationAction.DESTROY);
+    pr.getAttributesMutator().setEntryIdleTimeout(expirationAttributes);
+    mutationMade.countDown();
+    createBucket.get();
+
+    getAllBucketRegions(pr)
+        .forEach(region -> assertEquals(expirationAttributes, 
region.getEntryIdleTimeout()));
+  }
+
+  @Test
+  public void testChangeEntryTtlDuringBucketCreation()
+      throws InterruptedException, ExecutionException {
+    createRegionWithFewBuckets();
+
+    CompletableFuture<Void> createBucket =
+        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
+    bucketCreated.await();
+    ExpirationAttributes expirationAttributes =
+        new ExpirationAttributes(1000, ExpirationAction.DESTROY);
+    pr.getAttributesMutator().setEntryTimeToLive(expirationAttributes);
+    mutationMade.countDown();
+    createBucket.get();
+
+    getAllBucketRegions(pr)
+        .forEach(region -> assertEquals(expirationAttributes, 
region.getEntryTimeToLive()));
+  }
+
+  private void createRegion() {
+    pr = (PartitionedRegion) 
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setStatisticsEnabled(true).create(TEST_REGION_NAME);
+    setRegionObserver();
+  }
+
+  private void createRegionWithFewBuckets() {
+    PartitionAttributes partitionAttributes =
+        new PartitionAttributesFactory().setTotalNumBuckets(5).create();
+    pr = (PartitionedRegion) 
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setStatisticsEnabled(true).setPartitionAttributes(partitionAttributes)
+        .create(TEST_REGION_NAME);
+    setRegionObserver();
+  }
+
+  // Adds an observer which will block bucket creation and wait for a loader 
to be added
+  private void setRegionObserver() {
+    PartitionedRegionObserverHolder.setInstance(new 
PartitionedRegionObserverAdapter() {
+      @Override
+      public void beforeAssignBucket(PartitionedRegion partitionedRegion, int 
bucketId) {
+        try {
+          // Indicate that the bucket has been created
+          bucketCreated.countDown();
+
+          // Wait for the loader to be added. if the synchronization
+          // is correct, this would wait for ever because setting the
+          // cache loader will wait for this method. So time out after
+          // 1 second, which should be good enough to cause a failure
+          // if the synchronization is broken.
+          mutationMade.await(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Interrupted");
+        }
+      }
+    });
+  }
+
+  private Set<BucketRegion> getAllBucketRegions(PartitionedRegion pr) {
+    return pr.getDataStore().getAllLocalBucketRegions();
+  }
+
+  private CacheLoader createTestCacheLoader() {
+    return new CacheLoader() {
+      @Override
+      public void close() {}
+
+      @Override
+      public Object load(LoaderHelper helper) throws CacheLoaderException {
+        return null;
+      }
+    };
+  }
+
+  private CustomExpiry createTestCustomExpiry() {
+    return new CustomExpiry() {
+      @Override
+      public ExpirationAttributes getExpiry(Region.Entry entry) {
+        return null;
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
index cd0917e..d9396da 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
@@ -15,10 +15,7 @@
 package org.apache.geode.internal.cache;
 
 import org.apache.geode.cache.*;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.distributed.DistributedSystem;
-import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
-import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
@@ -26,8 +23,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.junit.Assert.*;
@@ -108,64 +103,6 @@ public class PartitionedRegionDataStoreJUnitTest {
 
   }
 
-  @Test
-  public void testChangeCacheLoaderDuringBucketCreation() throws Exception {
-    final PartitionedRegion pr =
-        (PartitionedRegion) cache.createRegionFactory(RegionShortcut.PARTITION)
-            .create("testChangeCacheLoaderDuringBucketCreation");
-
-    // Add an observer which will block bucket creation and wait for a loader 
to be added
-    final CountDownLatch loaderAdded = new CountDownLatch(1);
-    final CountDownLatch bucketCreated = new CountDownLatch(1);
-    PartitionedRegionObserverHolder.setInstance(new 
PartitionedRegionObserverAdapter() {
-      @Override
-      public void beforeAssignBucket(PartitionedRegion partitionedRegion, int 
bucketId) {
-        try {
-          // Indicate that the bucket has been created
-          bucketCreated.countDown();
-
-          // Wait for the loader to be added. if the synchronization
-          // is correct, this would wait for ever because setting the
-          // cache loader will wait for this method. So time out after
-          // 1 second, which should be good enough to cause a failure
-          // if the synchronization is broken.
-          loaderAdded.await(1, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-          throw new RuntimeException("Interrupted");
-        }
-      }
-    });
-
-    Thread createBuckets = new Thread() {
-      public void run() {
-        PartitionRegionHelper.assignBucketsToPartitions(pr);
-      }
-    };
-
-    createBuckets.start();
-
-    CacheLoader loader = new CacheLoader() {
-      @Override
-      public void close() {}
-
-      @Override
-      public Object load(LoaderHelper helper) throws CacheLoaderException {
-        return null;
-      }
-    };
-
-    bucketCreated.await();
-    pr.getAttributesMutator().setCacheLoader(loader);
-    loaderAdded.countDown();
-    createBuckets.join();
-
-
-    // Assert that all buckets have received the cache loader
-    for (BucketRegion bucket : pr.getDataStore().getAllLocalBucketRegions()) {
-      assertEquals(loader, bucket.getCacheLoader());
-    }
-  }
-
   /**
    * This method checks whether the canAccomodateMoreBytesSafely returns false 
after reaching the
    * localMax memory.

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Reply via email to