This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c64039411520f2925ea6f3dcfd9a35c04d794980 Author: zhouxh <[email protected]> AuthorDate: Mon Jan 27 17:02:48 2020 -0800 GEODE-7683: introduce BR.cmnClearRegion --- .../apache/geode/internal/cache/BucketRegion.java | 38 +++++++++-- .../geode/internal/cache/DistributedRegion.java | 23 +++++-- .../internal/cache/BucketRegionJUnitTest.java | 77 ++++++++++++++++++++++ .../internal/cache/DistributedRegionJUnitTest.java | 18 +++++ 4 files changed, 145 insertions(+), 11 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index ac20526..761c1b1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -557,6 +557,36 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } + @Override + void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) { + if (!getBucketAdvisor().isPrimary()) { + if (logger.isDebugEnabled()) { + logger.debug("Not primary bucket when doing clear, do nothing"); + } + return; + } + + boolean enableRVV = useRVV && getConcurrencyChecksEnabled(); + RegionVersionVector rvv = null; + if (enableRVV) { + rvv = getVersionVector().getCloneForTransmission(); + } + + // get rvvLock + Set<InternalDistributedMember> participants = + getCacheDistributionAdvisor().adviseInvalidateRegion(); + try { + obtainWriteLocksForClear(regionEvent, participants); + // no need to dominate my own rvv. + // Clear is on going here, there won't be GII for this member + clearRegionLocally(regionEvent, cacheWrite, null); + distributeClearOperation(regionEvent, rvv, participants); + + // TODO: call reindexUserDataRegion if there're lucene indexes + } finally { + releaseWriteLocksForClear(regionEvent, participants); + } + } long generateTailKey() { long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets()); @@ -2094,11 +2124,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { // if GII has failed, because there is not primary. So it's safe to set these // counters to 0. oldMemValue = bytesInMemory.getAndSet(0); - } - - else { - throw new InternalGemFireError( - "Trying to clear a bucket region that was not destroyed or in initialization."); + } else { + // BucketRegion's clear is supported now + oldMemValue = bytesInMemory.getAndSet(0); } if (oldMemValue != BUCKET_DESTROYED) { partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 9964978..1a62919 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2003,6 +2003,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute super.basicClear(regionEvent, cacheWrite); } + void distributeClearOperation(RegionEventImpl regionEvent, RegionVersionVector rvv, + Set<InternalDistributedMember> participants) { + DistributedClearOperation.clear(regionEvent, rvv, participants); + } @Override void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) { @@ -2025,7 +2029,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute obtainWriteLocksForClear(regionEvent, participants); clearRegionLocally(regionEvent, cacheWrite, null); if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) { - DistributedClearOperation.clear(regionEvent, null, participants); + distributeClearOperation(regionEvent, null, participants); } } finally { releaseWriteLocksForClear(regionEvent, participants); @@ -2081,10 +2085,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute /** * obtain locks preventing generation of new versions in other members */ - private void obtainWriteLocksForClear(RegionEventImpl regionEvent, + protected void obtainWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent); - DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants); + if (!isUsedForPartitionedRegionBucket()) { + DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants); + } } /** @@ -2121,7 +2127,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute /** * releases the locks obtained in obtainWriteLocksForClear */ - private void releaseWriteLocksForClear(RegionEventImpl regionEvent, + protected void releaseWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook(); @@ -2129,8 +2135,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute armLockTestHook.beforeRelease(this, regionEvent); } - getVersionVector().unlockForClear(getMyId()); - DistributedClearOperation.releaseLocks(regionEvent, participants); + RegionVersionVector rvv = getVersionVector(); + if (rvv != null) { + rvv.unlockForClear(getMyId()); + } + if (!isUsedForPartitionedRegionBucket()) { + DistributedClearOperation.releaseLocks(regionEvent, participants); + } if (armLockTestHook != null) { armLockTestHook.afterRelease(this, regionEvent); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java index 72e6657..c7cf5a6 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java @@ -14,7 +14,9 @@ */ package org.apache.geode.internal.cache; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.anyLong; @@ -31,7 +33,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.junit.Test; + import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.statistics.StatisticsClock; public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { @@ -128,4 +133,76 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { } } + @Test + public void cmnClearRegionWillDoNothingIfNotPrimary() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + BucketAdvisor ba = mock(BucketAdvisor.class); + RegionVersionVector rvv = mock(RegionVersionVector.class); + doReturn(rvv).when(region).getVersionVector(); + doReturn(ba).when(region).getBucketAdvisor(); + when(ba.isPrimary()).thenReturn(false); + region.cmnClearRegion(event, true, true); + verify(region, never()).clearRegionLocally(eq(event), eq(true), eq(rvv)); + } + + @Test + public void cmnClearRegionCalledOnPrimary() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + BucketAdvisor ba = mock(BucketAdvisor.class); + RegionVersionVector rvv = mock(RegionVersionVector.class); + doReturn(rvv).when(region).getVersionVector(); + doReturn(true).when(region).getConcurrencyChecksEnabled(); + doReturn(ba).when(region).getBucketAdvisor(); + doNothing().when(region).distributeClearOperation(any(), any(), any()); + doNothing().when(region).lockLocallyForClear(any(), any(), any()); + doNothing().when(region).clearRegionLocally(event, true, null); + when(ba.isPrimary()).thenReturn(true); + region.cmnClearRegion(event, true, true); + verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null)); + } + + @Test + public void clearWillUseNullAsRVVWhenConcurrencyCheckDisabled() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + BucketAdvisor ba = mock(BucketAdvisor.class); + doReturn(false).when(region).getConcurrencyChecksEnabled(); + doReturn(ba).when(region).getBucketAdvisor(); + doNothing().when(region).distributeClearOperation(any(), any(), any()); + doNothing().when(region).lockLocallyForClear(any(), any(), any()); + doNothing().when(region).clearRegionLocally(event, true, null); + when(ba.isPrimary()).thenReturn(true); + region.cmnClearRegion(event, true, true); + verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null)); + } + + @Test + public void obtainWriteLocksForClearInBRShouldNotDistribute() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + doNothing().when(region).lockLocallyForClear(any(), any(), any()); + region.obtainWriteLocksForClear(event, null); + assertTrue(region.isUsedForPartitionedRegionBucket()); + } + + @Test + public void updateSizeToZeroOnClearBucketRegion() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + PartitionedRegion pr = region.getPartitionedRegion(); + PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class); + PartitionedRegionStats prStats = mock(PartitionedRegionStats.class); + when(pr.getPrStats()).thenReturn(prStats); + doNothing().when(prStats).incDataStoreEntryCount(anyInt()); + doNothing().when(prds).updateMemoryStats(anyInt()); + when(pr.getDataStore()).thenReturn(prds); + region.updateSizeOnCreate("key1", 20); + long sizeBeforeClear = region.getTotalBytes(); + assertEquals(20, sizeBeforeClear); + region.updateSizeOnClearRegion((int) sizeBeforeClear); + long sizeAfterClear = region.getTotalBytes(); + assertEquals(0, sizeAfterClear); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java index 9fbd8fc..ca53ced 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache; +import static org.apache.geode.internal.Assert.fail; import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; @@ -53,6 +54,14 @@ public class DistributedRegionJUnitTest @Override protected void setInternalRegionArguments(InternalRegionArguments ira) {} + protected RegionEventImpl createClearRegionEvent() { + DistributedRegion region = prepare(true, true); + DistributedMember member = mock(DistributedMember.class); + RegionEventImpl regionEvent = new RegionEventImpl(region, Operation.REGION_CLEAR, null, false, + member, true); + return regionEvent; + } + @Override protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled, RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache, @@ -246,4 +255,13 @@ public class DistributedRegionJUnitTest region.basicBridgeReplace("key1", "value1", false, null, client, true, clientEvent); assertThat(clientEvent.getVersionTag().equals(tag)); } + + @Test(expected = UnsupportedOperationException.class) + public void localClearIsNotSupportedOnReplicatedRegion() { + RegionEventImpl event = createClearRegionEvent(); + DistributedRegion region = (DistributedRegion) event.getRegion(); + region.basicLocalClear(event); + fail("Expect UnsupportedOperationException"); + } + }
