This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c040c59bfd5db9222d5675f205609cf16dad4209 Author: Xiaojian Zhou <[email protected]> AuthorDate: Mon Mar 16 17:35:35 2020 -0700 PR.clear's event id should be created and used in BR (#4805) * GEODE-7857: PR.clear's event id should be created and used in BR --- .../PartitionedRegionPersistentClearDUnitTest.java | 2 +- .../codeAnalysis/sanctionedDataSerializables.txt | 4 +- .../geode/internal/cache/PartitionedRegion.java | 8 +-- .../internal/cache/partitioned/ClearPRMessage.java | 12 ++-- .../internal/cache/PartitionedRegionTest.java | 65 ++++++++++++++++++++++ 5 files changed, 80 insertions(+), 11 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java index 847699b..c758446 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java @@ -21,6 +21,6 @@ import org.apache.geode.cache.RegionShortcut; public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest { protected RegionShortcut getRegionShortCut() { - return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW; + return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; } } diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index bdeaf11..6dd3a34 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1369,8 +1369,8 @@ fromData,27 toData,27 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2 -fromData,19 -toData,36 +fromData,30 +toData,44 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2 fromData,17 diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 82bb2e0..fb312a2 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -2189,7 +2189,7 @@ public class PartitionedRegion extends LocalRegion } // create ClearPRMessage per bucket - List<ClearPRMessage> clearMsgList = createClearPRMessages(); + List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId()); for (ClearPRMessage clearPRMessage : clearMsgList) { int bucketId = clearPRMessage.getBucketId(); checkReadiness(); @@ -2361,10 +2361,10 @@ public class PartitionedRegion extends LocalRegion } } - List<ClearPRMessage> createClearPRMessages() { + List<ClearPRMessage> createClearPRMessages(EventID eventID) { ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>(); - for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) { - ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId); + for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) { + ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId, eventID); clearMsgList.add(clearPRMessage); } return clearMsgList; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java index 9fa8057..cc01920 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java @@ -56,6 +56,8 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { private Integer bucketId; + private EventID eventID; + public static final String BUCKET_NON_PRIMARY_MESSAGE = "The bucket region on target member is no longer primary"; public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION = @@ -71,8 +73,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { */ public ClearPRMessage() {} - public ClearPRMessage(int bucketId) { + public ClearPRMessage(int bucketId, EventID eventID) { this.bucketId = bucketId; + this.eventID = eventID; } public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients, @@ -119,6 +122,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } else { InternalDataSerializer.writeSignedVL(bucketId, out); } + DataSerializer.writeObject(this.eventID, out); } @Override @@ -126,6 +130,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { throws IOException, ClassNotFoundException { super.fromData(in, context); this.bucketId = (int) InternalDataSerializer.readSignedVL(in); + this.eventID = (EventID) DataSerializer.readObject(in); } @Override @@ -168,9 +173,8 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); } try { - RegionEventImpl regionEvent = new RegionEventImpl(); - regionEvent.setOperation(Operation.REGION_CLEAR); - regionEvent.setRegion(bucketRegion); + RegionEventImpl regionEvent = new RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null, + false, region.getMyId(), eventID); bucketRegion.cmnClearRegion(regionEvent, true, true); } catch (PartitionOfflineException poe) { logger.info( diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java index 742db8a..898c4f7 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java @@ -21,10 +21,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.catchThrowable; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -39,6 +41,7 @@ import static org.mockito.quality.Strictness.STRICT_STUBS; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -55,6 +58,7 @@ import org.mockito.junit.MockitoRule; import org.apache.geode.CancelCriterion; import org.apache.geode.Statistics; import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheLoader; import org.apache.geode.cache.CacheWriter; import org.apache.geode.cache.Operation; @@ -71,6 +75,7 @@ import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.FetchKeysMessage; import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory; import org.apache.geode.internal.cache.tier.sockets.ServerConnection; @@ -208,6 +213,66 @@ public class PartitionedRegionTest { } @Test + public void clearShouldNotThrowUnsupportedOperationException() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + doNothing().when(spyPartitionedRegion).checkReadiness(); + doCallRealMethod().when(spyPartitionedRegion).basicClear(any()); + doNothing().when(spyPartitionedRegion).basicClear(any(), anyBoolean()); + spyPartitionedRegion.clear(); + } + + @Test(expected = CacheClosedException.class) + public void clearShouldThrowCacheClosedExceptionIfShutdownAll() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + RegionEventImpl regionEvent = + new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false, + spyPartitionedRegion.getMyId(), true); + when(cache.isCacheAtShutdownAll()).thenReturn(true); + when(cache.getCacheClosedException("Cache is shutting down")) + .thenReturn(new CacheClosedException("Cache is shutting down")); + DistributedLockService lockService = mock(DistributedLockService.class); + when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService); + String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_'); + when(lockService.lock(lockName, -1, -1)).thenReturn(true); + spyPartitionedRegion.basicClear(regionEvent, true); + } + + @Test + public void createClearPRMessagesShouldCreateMessagePerBucket() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + RegionEventImpl regionEvent = + new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false, + spyPartitionedRegion.getMyId(), true); + when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3); + EventID eventID = new EventID(spyPartitionedRegion.getCache().getDistributedSystem()); + List<ClearPRMessage> msgs = spyPartitionedRegion.createClearPRMessages(eventID); + assertThat(msgs.size()).isEqualTo(3); + } + + @Test + public void sendEachMessagePerBucket() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + RegionEventImpl regionEvent = + new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false, + spyPartitionedRegion.getMyId(), true); + when(cache.isCacheAtShutdownAll()).thenReturn(false); + DistributedLockService lockService = mock(DistributedLockService.class); + when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService); + when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3); + String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_'); + when(lockService.lock(lockName, -1, -1)).thenReturn(true); + when(spyPartitionedRegion.hasListener()).thenReturn(true); + doNothing().when(spyPartitionedRegion).dispatchListenerEvent(any(), any()); + doNothing().when(spyPartitionedRegion).notifyBridgeClients(eq(regionEvent)); + doNothing().when(spyPartitionedRegion).checkReadiness(); + doNothing().when(lockService).unlock(lockName); + spyPartitionedRegion.basicClear(regionEvent, true); + verify(spyPartitionedRegion, times(3)).sendClearMsgByBucket(any(), any()); + verify(spyPartitionedRegion, times(1)).dispatchListenerEvent(any(), any()); + verify(spyPartitionedRegion, times(1)).notifyBridgeClients(eq(regionEvent)); + } + + @Test public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() { // ARRANGE EntryEventImpl clientEvent = mock(EntryEventImpl.class);
