This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d0daa20440449b4120015554cd7b10608f6e9e81
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Jan 27 17:02:48 2020 -0800

    GEODE-7683: introduce BR.cmnClearRegion
---
 .../apache/geode/internal/cache/BucketRegion.java  | 38 +++++++++--
 .../geode/internal/cache/DistributedRegion.java    | 23 +++++--
 .../internal/cache/BucketRegionJUnitTest.java      | 77 ++++++++++++++++++++++
 .../internal/cache/DistributedRegionJUnitTest.java | 18 +++++
 4 files changed, 145 insertions(+), 11 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index ac20526..761c1b1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -557,6 +557,36 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     }
   }
 
+  @Override
+  void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean 
useRVV) {
+    if (!getBucketAdvisor().isPrimary()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Not primary bucket when doing clear, do nothing");
+      }
+      return;
+    }
+
+    boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
+    RegionVersionVector rvv = null;
+    if (enableRVV) {
+      rvv = getVersionVector().getCloneForTransmission();
+    }
+
+    // get rvvLock
+    Set<InternalDistributedMember> participants =
+        getCacheDistributionAdvisor().adviseInvalidateRegion();
+    try {
+      obtainWriteLocksForClear(regionEvent, participants);
+      // no need to dominate my own rvv.
+      // Clear is on going here, there won't be GII for this member
+      clearRegionLocally(regionEvent, cacheWrite, null);
+      distributeClearOperation(regionEvent, rvv, participants);
+
+      // TODO: call reindexUserDataRegion if there're lucene indexes
+    } finally {
+      releaseWriteLocksForClear(regionEvent, participants);
+    }
+  }
 
   long generateTailKey() {
     long key = 
eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets());
@@ -2094,11 +2124,9 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
       // if GII has failed, because there is not primary. So it's safe to set 
these
       // counters to 0.
       oldMemValue = bytesInMemory.getAndSet(0);
-    }
-
-    else {
-      throw new InternalGemFireError(
-          "Trying to clear a bucket region that was not destroyed or in 
initialization.");
+    } else {
+      // BucketRegion's clear is supported now
+      oldMemValue = bytesInMemory.getAndSet(0);
     }
     if (oldMemValue != BUCKET_DESTROYED) {
       partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 9964978..1a62919 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2003,6 +2003,10 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
     super.basicClear(regionEvent, cacheWrite);
   }
 
+  void distributeClearOperation(RegionEventImpl regionEvent, 
RegionVersionVector rvv,
+      Set<InternalDistributedMember> participants) {
+    DistributedClearOperation.clear(regionEvent, rvv, participants);
+  }
 
   @Override
   void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean 
useRVV) {
@@ -2025,7 +2029,7 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
             obtainWriteLocksForClear(regionEvent, participants);
             clearRegionLocally(regionEvent, cacheWrite, null);
             if (!regionEvent.isOriginRemote() && 
regionEvent.getOperation().isDistributed()) {
-              DistributedClearOperation.clear(regionEvent, null, participants);
+              distributeClearOperation(regionEvent, null, participants);
             }
           } finally {
             releaseWriteLocksForClear(regionEvent, participants);
@@ -2081,10 +2085,12 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
   /**
    * obtain locks preventing generation of new versions in other members
    */
-  private void obtainWriteLocksForClear(RegionEventImpl regionEvent,
+  protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
     lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
-    DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.lockAndFlushToOthers(regionEvent, 
participants);
+    }
   }
 
   /**
@@ -2121,7 +2127,7 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
   /**
    * releases the locks obtained in obtainWriteLocksForClear
    */
-  private void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+  protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
 
     ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
@@ -2129,8 +2135,13 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
       armLockTestHook.beforeRelease(this, regionEvent);
     }
 
-    getVersionVector().unlockForClear(getMyId());
-    DistributedClearOperation.releaseLocks(regionEvent, participants);
+    RegionVersionVector rvv = getVersionVector();
+    if (rvv != null) {
+      rvv.unlockForClear(getMyId());
+    }
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.releaseLocks(regionEvent, participants);
+    }
 
     if (armLockTestHook != null) {
       armLockTestHook.afterRelease(this, regionEvent);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index 72e6657..c7cf5a6 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -14,7 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.anyLong;
@@ -31,7 +33,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.junit.Test;
+
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.statistics.StatisticsClock;
 
 public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
@@ -128,4 +133,76 @@ public class BucketRegionJUnitTest extends 
DistributedRegionJUnitTest {
     }
   }
 
+  @Test
+  public void cmnClearRegionWillDoNothingIfNotPrimary() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    RegionVersionVector rvv = mock(RegionVersionVector.class);
+    doReturn(rvv).when(region).getVersionVector();
+    doReturn(ba).when(region).getBucketAdvisor();
+    when(ba.isPrimary()).thenReturn(false);
+    region.cmnClearRegion(event, true, true);
+    verify(region, never()).clearRegionLocally(eq(event), eq(true), eq(rvv));
+  }
+
+  @Test
+  public void cmnClearRegionCalledOnPrimary() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    RegionVersionVector rvv = mock(RegionVersionVector.class);
+    doReturn(rvv).when(region).getVersionVector();
+    doReturn(true).when(region).getConcurrencyChecksEnabled();
+    doReturn(ba).when(region).getBucketAdvisor();
+    doNothing().when(region).distributeClearOperation(any(), any(), any());
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).clearRegionLocally(event, true, null);
+    when(ba.isPrimary()).thenReturn(true);
+    region.cmnClearRegion(event, true, true);
+    verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+  }
+
+  @Test
+  public void clearWillUseNullAsRVVWhenConcurrencyCheckDisabled() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    doReturn(false).when(region).getConcurrencyChecksEnabled();
+    doReturn(ba).when(region).getBucketAdvisor();
+    doNothing().when(region).distributeClearOperation(any(), any(), any());
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).clearRegionLocally(event, true, null);
+    when(ba.isPrimary()).thenReturn(true);
+    region.cmnClearRegion(event, true, true);
+    verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+  }
+
+  @Test
+  public void obtainWriteLocksForClearInBRShouldNotDistribute() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    region.obtainWriteLocksForClear(event, null);
+    assertTrue(region.isUsedForPartitionedRegionBucket());
+  }
+
+  @Test
+  public void updateSizeToZeroOnClearBucketRegion() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    PartitionedRegion pr = region.getPartitionedRegion();
+    PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
+    PartitionedRegionStats prStats = mock(PartitionedRegionStats.class);
+    when(pr.getPrStats()).thenReturn(prStats);
+    doNothing().when(prStats).incDataStoreEntryCount(anyInt());
+    doNothing().when(prds).updateMemoryStats(anyInt());
+    when(pr.getDataStore()).thenReturn(prds);
+    region.updateSizeOnCreate("key1", 20);
+    long sizeBeforeClear = region.getTotalBytes();
+    assertEquals(20, sizeBeforeClear);
+    region.updateSizeOnClearRegion((int) sizeBeforeClear);
+    long sizeAfterClear = region.getTotalBytes();
+    assertEquals(0, sizeAfterClear);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 9fbd8fc..ca53ced 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.apache.geode.internal.Assert.fail;
 import static 
org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
@@ -53,6 +54,14 @@ public class DistributedRegionJUnitTest
   @Override
   protected void setInternalRegionArguments(InternalRegionArguments ira) {}
 
+  protected RegionEventImpl createClearRegionEvent() {
+    DistributedRegion region = prepare(true, true);
+    DistributedMember member = mock(DistributedMember.class);
+    RegionEventImpl regionEvent = new RegionEventImpl(region, 
Operation.REGION_CLEAR, null, false,
+        member, true);
+    return regionEvent;
+  }
+
   @Override
   protected DistributedRegion createAndDefineRegion(boolean 
isConcurrencyChecksEnabled,
       RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache,
@@ -246,4 +255,13 @@ public class DistributedRegionJUnitTest
     region.basicBridgeReplace("key1", "value1", false, null, client, true, 
clientEvent);
     assertThat(clientEvent.getVersionTag().equals(tag));
   }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void localClearIsNotSupportedOnReplicatedRegion() {
+    RegionEventImpl event = createClearRegionEvent();
+    DistributedRegion region = (DistributedRegion) event.getRegion();
+    region.basicLocalClear(event);
+    fail("Expect UnsupportedOperationException");
+  }
+
 }

Reply via email to