This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5cb2a59  Revert "GEODE-3647: Fix race condition"
5cb2a59 is described below

commit 5cb2a591199845190751f52e4da758c5ccd4d44e
Author: Darrel Schneider <[email protected]>
AuthorDate: Mon Oct 2 13:46:07 2017 -0700

    Revert "GEODE-3647: Fix race condition"
    
    This reverts commit 96149530d82e0b62e9df1a043bfd7d0e01d3411a.
---
 .../geode/internal/cache/PartitionedRegion.java    |  49 +++--
 .../internal/cache/PartitionedRegionDataStore.java |  14 +-
 .../PartitionedRegionAttributesMutatorTest.java    | 211 ---------------------
 .../cache/PartitionedRegionDataStoreJUnitTest.java |  63 ++++++
 4 files changed, 94 insertions(+), 243 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 0a0b802..0ada1f9 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -100,7 +100,6 @@ import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.partition.PartitionListener;
 import org.apache.geode.cache.partition.PartitionNotAvailableException;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.cache.query.FunctionDomainException;
@@ -9151,14 +9150,14 @@ public class PartitionedRegion extends LocalRegion
   @Override
   public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes 
timeToLive) {
     ExpirationAttributes attr = super.setEntryTimeToLive(timeToLive);
-
-    /*
-     * All buckets must be created to make this change, otherwise it is 
possible for
-     * updatePRConfig(...) to make changes that cause bucket creation to live 
lock
-     */
-    PartitionRegionHelper.assignBucketsToPartitions(this);
-    dataStore.lockBucketCreationAndVisit(
-        (bucketId, r) -> 
r.getAttributesMutator().setEntryTimeToLive(timeToLive));
+    // Set to Bucket regions as well
+    if (this.getDataStore() != null) { // not for accessors
+      for (Object o : this.getDataStore().getAllLocalBuckets()) {
+        Map.Entry entry = (Map.Entry) o;
+        Region bucketRegion = (Region) entry.getValue();
+        bucketRegion.getAttributesMutator().setEntryTimeToLive(timeToLive);
+      }
+    }
     updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
     return attr;
   }
@@ -9182,8 +9181,13 @@ public class PartitionedRegion extends LocalRegion
   public CustomExpiry setCustomEntryTimeToLive(CustomExpiry custom) {
     CustomExpiry expiry = super.setCustomEntryTimeToLive(custom);
     // Set to Bucket regions as well
-    dataStore.lockBucketCreationAndVisit(
-        (bucketId, r) -> 
r.getAttributesMutator().setCustomEntryTimeToLive(custom));
+    if (this.getDataStore() != null) { // not for accessors
+      for (Object o : this.getDataStore().getAllLocalBuckets()) {
+        Map.Entry entry = (Map.Entry) o;
+        Region bucketRegion = (Region) entry.getValue();
+        bucketRegion.getAttributesMutator().setCustomEntryTimeToLive(custom);
+      }
+    }
     return expiry;
   }
 
@@ -9202,14 +9206,14 @@ public class PartitionedRegion extends LocalRegion
   @Override
   public ExpirationAttributes setEntryIdleTimeout(ExpirationAttributes 
idleTimeout) {
     ExpirationAttributes attr = super.setEntryIdleTimeout(idleTimeout);
-    /*
-     * All buckets must be created to make this change, otherwise it is 
possible for
-     * updatePRConfig(...) to make changes that cause bucket creation to live 
lock
-     */
-    PartitionRegionHelper.assignBucketsToPartitions(this);
     // Set to Bucket regions as well
-    dataStore.lockBucketCreationAndVisit(
-        (bucketId, r) -> 
r.getAttributesMutator().setEntryIdleTimeout(idleTimeout));
+    if (this.getDataStore() != null) { // not for accessors
+      for (Object o : this.getDataStore().getAllLocalBuckets()) {
+        Map.Entry entry = (Map.Entry) o;
+        Region bucketRegion = (Region) entry.getValue();
+        bucketRegion.getAttributesMutator().setEntryIdleTimeout(idleTimeout);
+      }
+    }
     updatePRConfig(getPRConfigWithLatestExpirationAttributes(), false);
     return attr;
   }
@@ -9224,8 +9228,13 @@ public class PartitionedRegion extends LocalRegion
   public CustomExpiry setCustomEntryIdleTimeout(CustomExpiry custom) {
     CustomExpiry expiry = super.setCustomEntryIdleTimeout(custom);
     // Set to Bucket regions as well
-    dataStore.lockBucketCreationAndVisit(
-        (bucketId, r) -> 
r.getAttributesMutator().setCustomEntryIdleTimeout(custom));
+    if (this.getDataStore() != null) { // not for accessors
+      for (Object o : this.getDataStore().getAllLocalBuckets()) {
+        Map.Entry entry = (Map.Entry) o;
+        Region bucketRegion = (Region) entry.getValue();
+        bucketRegion.getAttributesMutator().setCustomEntryIdleTimeout(custom);
+      }
+    }
     return expiry;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 91d230e..dc91f0f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -953,16 +953,6 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
     }
   }
 
-  protected void lockBucketCreationAndVisit(BucketVisitor visitor) {
-    StoppableWriteLock lock = this.bucketCreationLock.writeLock();
-    lock.lock();
-    try {
-      visitBuckets(visitor);
-    } finally {
-      lock.unlock();
-    }
-  }
-
   /**
    * Gets the total amount of memory in bytes allocated for all values for 
this PR in this VM. This
    * is the current memory (MB) watermark for data in this PR.
@@ -2486,8 +2476,8 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
    * Interface for visiting buckets
    */
   // public visibility for tests
-  public interface BucketVisitor {
-    void visit(Integer bucketId, Region r);
+  public static abstract class BucketVisitor {
+    abstract public void visit(Integer bucketId, Region r);
   }
 
   // public visibility for tests
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
deleted file mode 100644
index 4e2e965..0000000
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAttributesMutatorTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license *
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding *
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance with the 
License. You may obtain a *
- * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * 
Unless required by
- * applicable law or agreed to in writing, software distributed under the 
License * is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express * or implied.
- * See the License for the specific language governing permissions and 
limitations under * the
- * License. *
- *
- */
-
-package org.apache.geode.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheLoader;
-import org.apache.geode.cache.CacheLoaderException;
-import org.apache.geode.cache.CustomExpiry;
-import org.apache.geode.cache.ExpirationAction;
-import org.apache.geode.cache.ExpirationAttributes;
-import org.apache.geode.cache.LoaderHelper;
-import org.apache.geode.cache.PartitionAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
-import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
-import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
-import org.apache.geode.test.dunit.rules.ServerStarterRule;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-
-@Category(IntegrationTest.class)
-public class PartitionedRegionAttributesMutatorTest {
-  private static final String TEST_REGION_NAME = "testRegion";
-  private static final int DEFAULT_WAIT_DURATION = 5;
-  private static final TimeUnit DEFAULT_WAIT_UNIT = TimeUnit.SECONDS;
-
-  @Rule
-  public ServerStarterRule server = new ServerStarterRule().withAutoStart();
-
-  private final CountDownLatch mutationMade = new CountDownLatch(1);
-  private final CountDownLatch bucketCreated = new CountDownLatch(1);
-  private PartitionedRegion pr;
-
-  @Test
-  public void testChangeCacheLoaderDuringBucketCreation()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    createRegion();
-    CacheLoader loader = createTestCacheLoader();
-
-    CompletableFuture<Void> createBucket =
-        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
-    bucketCreated.await();
-    pr.getAttributesMutator().setCacheLoader(loader);
-    mutationMade.countDown();
-    createBucket.get(DEFAULT_WAIT_DURATION, DEFAULT_WAIT_UNIT);
-
-    getAllBucketRegions(pr).forEach(region -> assertEquals(loader, 
region.getCacheLoader()));
-  }
-
-  @Test
-  public void testChangeCustomEntryTtlDuringBucketCreation()
-      throws InterruptedException, ExecutionException {
-    createRegion();
-    CustomExpiry customExpiry = createTestCustomExpiry();
-
-    CompletableFuture<Void> createBucket =
-        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
-    bucketCreated.await();
-    pr.getAttributesMutator().setCustomEntryTimeToLive(customExpiry);
-    mutationMade.countDown();
-    createBucket.get();
-
-    getAllBucketRegions(pr)
-        .forEach(region -> assertEquals(customExpiry, 
region.customEntryTimeToLive));
-  }
-
-  @Test
-  public void testChangeCustomEntryIdleTimeoutDuringBucketCreation()
-      throws InterruptedException, ExecutionException {
-    createRegion();
-    CustomExpiry customExpiry = createTestCustomExpiry();
-
-    CompletableFuture<Void> createBucket =
-        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
-    bucketCreated.await();
-    pr.getAttributesMutator().setCustomEntryIdleTimeout(customExpiry);
-    mutationMade.countDown();
-    createBucket.get();
-
-    getAllBucketRegions(pr)
-        .forEach(region -> assertEquals(customExpiry, 
region.customEntryIdleTimeout));
-  }
-
-  @Test
-  public void testChangeEntryIdleTimeoutDuringBucketCreation()
-      throws InterruptedException, ExecutionException {
-    createRegionWithFewBuckets();
-
-    CompletableFuture<Void> createBucket =
-        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
-    bucketCreated.await();
-    ExpirationAttributes expirationAttributes =
-        new ExpirationAttributes(1000, ExpirationAction.DESTROY);
-    pr.getAttributesMutator().setEntryIdleTimeout(expirationAttributes);
-    mutationMade.countDown();
-    createBucket.get();
-
-    getAllBucketRegions(pr)
-        .forEach(region -> assertEquals(expirationAttributes, 
region.getEntryIdleTimeout()));
-  }
-
-  @Test
-  public void testChangeEntryTtlDuringBucketCreation()
-      throws InterruptedException, ExecutionException {
-    createRegionWithFewBuckets();
-
-    CompletableFuture<Void> createBucket =
-        CompletableFuture.runAsync(() -> 
PartitionRegionHelper.assignBucketsToPartitions(pr));
-    bucketCreated.await();
-    ExpirationAttributes expirationAttributes =
-        new ExpirationAttributes(1000, ExpirationAction.DESTROY);
-    pr.getAttributesMutator().setEntryTimeToLive(expirationAttributes);
-    mutationMade.countDown();
-    createBucket.get();
-
-    getAllBucketRegions(pr)
-        .forEach(region -> assertEquals(expirationAttributes, 
region.getEntryTimeToLive()));
-  }
-
-  private void createRegion() {
-    pr = (PartitionedRegion) 
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
-        .setStatisticsEnabled(true).create(TEST_REGION_NAME);
-    setRegionObserver();
-  }
-
-  private void createRegionWithFewBuckets() {
-    PartitionAttributes partitionAttributes =
-        new PartitionAttributesFactory().setTotalNumBuckets(5).create();
-    pr = (PartitionedRegion) 
server.getCache().createRegionFactory(RegionShortcut.PARTITION)
-        .setStatisticsEnabled(true).setPartitionAttributes(partitionAttributes)
-        .create(TEST_REGION_NAME);
-    setRegionObserver();
-  }
-
-  // Adds an observer which will block bucket creation and wait for a loader 
to be added
-  private void setRegionObserver() {
-    PartitionedRegionObserverHolder.setInstance(new 
PartitionedRegionObserverAdapter() {
-      @Override
-      public void beforeAssignBucket(PartitionedRegion partitionedRegion, int 
bucketId) {
-        try {
-          // Indicate that the bucket has been created
-          bucketCreated.countDown();
-
-          // Wait for the loader to be added. if the synchronization
-          // is correct, this would wait for ever because setting the
-          // cache loader will wait for this method. So time out after
-          // 1 second, which should be good enough to cause a failure
-          // if the synchronization is broken.
-          mutationMade.await(1, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-          throw new RuntimeException("Interrupted");
-        }
-      }
-    });
-  }
-
-  private Set<BucketRegion> getAllBucketRegions(PartitionedRegion pr) {
-    return pr.getDataStore().getAllLocalBucketRegions();
-  }
-
-  private CacheLoader createTestCacheLoader() {
-    return new CacheLoader() {
-      @Override
-      public void close() {}
-
-      @Override
-      public Object load(LoaderHelper helper) throws CacheLoaderException {
-        return null;
-      }
-    };
-  }
-
-  private CustomExpiry createTestCustomExpiry() {
-    return new CustomExpiry() {
-      @Override
-      public ExpirationAttributes getExpiry(Region.Entry entry) {
-        return null;
-      }
-
-      @Override
-      public void close() {
-
-      }
-    };
-  }
-}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
index d9396da..cd0917e 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreJUnitTest.java
@@ -15,7 +15,10 @@
 package org.apache.geode.internal.cache;
 
 import org.apache.geode.cache.*;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.distributed.DistributedSystem;
+import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverAdapter;
+import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
@@ -23,6 +26,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.junit.Assert.*;
@@ -103,6 +108,64 @@ public class PartitionedRegionDataStoreJUnitTest {
 
   }
 
+  @Test
+  public void testChangeCacheLoaderDuringBucketCreation() throws Exception {
+    final PartitionedRegion pr =
+        (PartitionedRegion) cache.createRegionFactory(RegionShortcut.PARTITION)
+            .create("testChangeCacheLoaderDuringBucketCreation");
+
+    // Add an observer which will block bucket creation and wait for a loader 
to be added
+    final CountDownLatch loaderAdded = new CountDownLatch(1);
+    final CountDownLatch bucketCreated = new CountDownLatch(1);
+    PartitionedRegionObserverHolder.setInstance(new 
PartitionedRegionObserverAdapter() {
+      @Override
+      public void beforeAssignBucket(PartitionedRegion partitionedRegion, int 
bucketId) {
+        try {
+          // Indicate that the bucket has been created
+          bucketCreated.countDown();
+
+          // Wait for the loader to be added. if the synchronization
+          // is correct, this would wait for ever because setting the
+          // cache loader will wait for this method. So time out after
+          // 1 second, which should be good enough to cause a failure
+          // if the synchronization is broken.
+          loaderAdded.await(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Interrupted");
+        }
+      }
+    });
+
+    Thread createBuckets = new Thread() {
+      public void run() {
+        PartitionRegionHelper.assignBucketsToPartitions(pr);
+      }
+    };
+
+    createBuckets.start();
+
+    CacheLoader loader = new CacheLoader() {
+      @Override
+      public void close() {}
+
+      @Override
+      public Object load(LoaderHelper helper) throws CacheLoaderException {
+        return null;
+      }
+    };
+
+    bucketCreated.await();
+    pr.getAttributesMutator().setCacheLoader(loader);
+    loaderAdded.countDown();
+    createBuckets.join();
+
+
+    // Assert that all buckets have received the cache loader
+    for (BucketRegion bucket : pr.getDataStore().getAllLocalBucketRegions()) {
+      assertEquals(loader, bucket.getCacheLoader());
+    }
+  }
+
   /**
    * This method checks whether the canAccomodateMoreBytesSafely returns false 
after reaching the
    * localMax memory.

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to