This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7d522e694d04e6d417da1f1c003ea48a9d4a9c58 Author: Kirk Lund <[email protected]> AuthorDate: Fri Apr 9 15:06:40 2021 -0700 GEODE-9132: Delete ClearPRMessage --- .../org/apache/geode/internal/DSFIDFactory.java | 1 - .../geode/internal/cache/PartitionedRegion.java | 10 - .../internal/cache/partitioned/ClearPRMessage.java | 320 --------------------- .../internal/cache/PartitionedRegionTest.java | 15 - .../cache/partitioned/ClearPRMessageTest.java | 260 ----------------- 5 files changed, 606 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index a89400f..07c396f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -290,7 +290,6 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe; import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage; -import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage; import org.apache.geode.internal.cache.partitioned.CreateBucketMessage; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 278266b..61be3b2 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -181,7 +181,6 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; import org.apache.geode.internal.cache.partitioned.DestroyMessage; @@ -2193,15 +2192,6 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - List<ClearPRMessage> createClearPRMessages(EventID eventID) { - ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>(); - for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) { - ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId, eventID); - clearMsgList.add(clearPRMessage); - } - return clearMsgList; - } - @Override void basicLocalClear(RegionEventImpl event) { throw new UnsupportedOperationException(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java deleted file mode 100644 index 2603b78..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache.partitioned; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Collections; -import java.util.Set; - -import org.apache.logging.log4j.Logger; - -import org.apache.geode.DataSerializer; -import org.apache.geode.annotations.VisibleForTesting; -import org.apache.geode.cache.CacheException; -import org.apache.geode.cache.Operation; -import org.apache.geode.cache.persistence.PartitionOfflineException; -import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.ClusterDistributionManager; -import org.apache.geode.distributed.internal.DirectReplyProcessor; -import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.distributed.internal.ReplyException; -import org.apache.geode.distributed.internal.ReplyMessage; -import org.apache.geode.distributed.internal.ReplyProcessor21; -import org.apache.geode.distributed.internal.ReplySender; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.Assert; -import org.apache.geode.internal.InternalDataSerializer; -import org.apache.geode.internal.NanoTimer; -import org.apache.geode.internal.cache.BucketRegion; -import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.ForceReattemptException; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.RegionEventImpl; -import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.geode.internal.serialization.DeserializationContext; -import org.apache.geode.internal.serialization.SerializationContext; -import org.apache.geode.logging.internal.log4j.api.LogService; - -public class ClearPRMessage extends PartitionMessageWithDirectReply { - private static final Logger logger = LogService.getLogger(); - - private Integer bucketId; - - private EventID eventID; - - public static final String BUCKET_NON_PRIMARY_MESSAGE = - "The bucket region on target member is no longer primary"; - public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION = - "An exception was thrown during the local clear operation: "; - - /** - * state from operateOnRegion that must be preserved for transmission from the waiting pool - */ - transient boolean result = false; - - /** - * Empty constructor to satisfy {@link DataSerializer}requirements - */ - public ClearPRMessage() {} - - public ClearPRMessage(int bucketId, EventID eventID) { - this.bucketId = bucketId; - this.eventID = eventID; - } - - public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients, - DirectReplyProcessor replyProcessor) { - this.resetRecipients(); - if (recipients != null) { - setRecipients(recipients); - } - this.regionId = region.getPRId(); - this.processor = replyProcessor; - this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId(); - if (replyProcessor != null) { - replyProcessor.enableSevereAlertProcessing(); - } - } - - public ClearResponse send(DistributedMember recipient, PartitionedRegion region) - throws ForceReattemptException { - Set<InternalDistributedMember> recipients = - Collections.singleton((InternalDistributedMember) recipient); - ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients); - initMessage(region, recipients, clearResponse); - if (logger.isDebugEnabled()) { - logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this); - } - - Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this); - if (failures != null && failures.size() > 0) { - throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures); - } - return clearResponse; - } - - @Override - public int getDSFID() { - return PR_CLEAR_MESSAGE; - } - - @Override - public void toData(DataOutput out, SerializationContext context) throws IOException { - super.toData(out, context); - if (bucketId == null) { - InternalDataSerializer.writeSignedVL(-1, out); - } else { - InternalDataSerializer.writeSignedVL(bucketId, out); - } - DataSerializer.writeObject(this.eventID, out); - } - - @Override - public void fromData(DataInput in, DeserializationContext context) - throws IOException, ClassNotFoundException { - super.fromData(in, context); - this.bucketId = (int) InternalDataSerializer.readSignedVL(in); - this.eventID = (EventID) DataSerializer.readObject(in); - } - - @Override - public EventID getEventID() { - return null; - } - - /** - * This method is called upon receipt and make the desired changes to the PartitionedRegion Note: - * It is very important that this message does NOT cause any deadlocks as the sender will wait - * indefinitely for the acknowledgement - */ - @Override - @VisibleForTesting - protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, - PartitionedRegion region, long startTime) { - try { - this.result = doLocalClear(region); - } catch (ForceReattemptException ex) { - sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region, - startTime); - return false; - } - return this.result; - } - - public Integer getBucketId() { - return this.bucketId; - } - - public boolean doLocalClear(PartitionedRegion region) - throws ForceReattemptException { - // Retrieve local bucket region which matches target bucketId - BucketRegion bucketRegion = - region.getDataStore().getInitializedBucketForId(null, this.bucketId); - - boolean lockedForPrimary = bucketRegion.doLockForPrimary(false); - // Check if we obtained primary lock, throw exception if not - if (!lockedForPrimary) { - throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); - } - try { - RegionEventImpl regionEvent = new RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null, - false, region.getMyId(), eventID); - bucketRegion.cmnClearRegion(regionEvent, false, true); - } catch (PartitionOfflineException poe) { - logger.info( - "All members holding data for bucket {} are offline, no more retries will be attempted", - this.bucketId, - poe); - throw poe; - } catch (Exception ex) { - throw new ForceReattemptException( - EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); - } finally { - bucketRegion.doUnlockForPrimary(); - } - - return true; - } - - @Override - public boolean canStartRemoteTransaction() { - return false; - } - - @Override - protected void sendReply(InternalDistributedMember member, int processorId, - DistributionManager distributionManager, ReplyException ex, - PartitionedRegion partitionedRegion, long startTime) { - if (partitionedRegion != null) { - if (startTime > 0) { - partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime); - } - } - ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result, - ex); - } - - @Override - protected void appendFields(StringBuilder buff) { - super.appendFields(buff); - buff.append("; bucketId=").append(this.bucketId); - } - - public static class ClearReplyMessage extends ReplyMessage { - @Override - public boolean getInlineProcess() { - return true; - } - - /** - * Empty constructor to conform to DataSerializable interface - */ - @SuppressWarnings("unused") - public ClearReplyMessage() {} - - private ClearReplyMessage(int processorId, boolean result, ReplyException ex) { - super(); - setProcessorId(processorId); - if (ex != null) { - setException(ex); - } else { - setReturnValue(result); - } - } - - /** - * Send an ack - */ - public static void send(InternalDistributedMember recipient, int processorId, - ReplySender replySender, - boolean result, ReplyException ex) { - Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL."); - ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex); - message.setRecipient(recipient); - replySender.putOutgoing(message); - } - - /** - * Processes this message. This method is invoked by the receiver of the message. - * - * @param distributionManager the distribution manager that is processing the message. - */ - @Override - public void process(final DistributionManager distributionManager, - final ReplyProcessor21 replyProcessor) { - final long startTime = getTimestamp(); - if (replyProcessor == null) { - if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { - logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this); - } - return; - } - if (replyProcessor instanceof ClearResponse) { - ((ClearResponse) replyProcessor).setResponse(this); - } - replyProcessor.process(this); - - if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { - logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this); - } - distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime); - } - - @Override - public int getDSFID() { - return PR_CLEAR_REPLY_MESSAGE; - } - - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(super.toString()); - stringBuilder.append(" returnValue="); - stringBuilder.append(getReturnValue()); - return stringBuilder.toString(); - } - } - - /** - * A processor to capture the value returned by {@link ClearPRMessage} - */ - public static class ClearResponse extends PartitionResponse { - private volatile boolean returnValue; - - public ClearResponse(InternalDistributedSystem distributedSystem, - Set<InternalDistributedMember> recipients) { - super(distributedSystem, recipients, false); - } - - public void setResponse(ClearReplyMessage response) { - if (response.getException() == null) { - this.returnValue = (boolean) response.getReturnValue(); - } - } - - /** - * @return the result of the remote clear operation - * @throws ForceReattemptException if the peer is no longer available - * @throws CacheException if the peer generates an error - */ - public boolean waitForResult() throws CacheException, ForceReattemptException { - waitForCacheException(); - return this.returnValue; - } - } -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java index fb01334..02c7a9d 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java @@ -42,7 +42,6 @@ import static org.mockito.quality.Strictness.STRICT_STUBS; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -84,7 +83,6 @@ import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.control.InternalResourceManager; -import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory; import org.apache.geode.test.junit.runners.GeodeParamsRunner; @@ -229,19 +227,6 @@ public class PartitionedRegionTest { } @Test - public void createClearPRMessagesShouldCreateMessagePerBucket() { - PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); - RegionEventImpl regionEvent = - new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false, - spyPartitionedRegion.getMyId(), true); - when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3); - EventID eventID = new EventID(spyPartitionedRegion.getCache().getDistributedSystem()); - List<ClearPRMessage> msgs = spyPartitionedRegion.createClearPRMessages(eventID); - assertThat(msgs.size()).isEqualTo(3); - } - - - @Test public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() { // ARRANGE EntryEventImpl clientEvent = mock(EntryEventImpl.class); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java deleted file mode 100644 index acdd4fc..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache.partitioned; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.notNull; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.HashSet; -import java.util.Set; - -import org.junit.Before; -import org.junit.Test; - -import org.apache.geode.distributed.internal.ClusterDistributionManager; -import org.apache.geode.distributed.internal.DMStats; -import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.ReplyException; -import org.apache.geode.distributed.internal.ReplySender; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.BucketRegion; -import org.apache.geode.internal.cache.ForceReattemptException; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.PartitionedRegionDataStore; -import org.apache.geode.internal.cache.PartitionedRegionStats; -import org.apache.geode.internal.cache.RegionEventImpl; - -public class ClearPRMessageTest { - - ClearPRMessage message; - PartitionedRegion region; - PartitionedRegionDataStore dataStore; - BucketRegion bucketRegion; - - @Before - public void setup() throws ForceReattemptException { - message = spy(new ClearPRMessage()); - InternalDistributedMember member = mock(InternalDistributedMember.class); - region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS); - dataStore = mock(PartitionedRegionDataStore.class); - when(region.getDataStore()).thenReturn(dataStore); - when(region.getFullPath()).thenReturn("/test"); - bucketRegion = mock(BucketRegion.class); - when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion); - RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class); - } - - @Test - public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() { - when(bucketRegion.isPrimary()).thenReturn(false); - - assertThatThrownBy(() -> message.doLocalClear(region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - } - - @Test - public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() { - when(bucketRegion.doLockForPrimary(false)).thenReturn(false); - - assertThatThrownBy(() -> message.doLocalClear(region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - } - - @Test - public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() { - NullPointerException exception = new NullPointerException("Error encountered"); - doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - - when(bucketRegion.doLockForPrimary(false)).thenReturn(true); - - assertThatThrownBy(() -> message.doLocalClear(region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION); - - // Confirm that cmnClearRegion was called - verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - } - - @Test - public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained() - throws ForceReattemptException { - - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.doLockForPrimary(false)).thenReturn(true); - assertThat(message.doLocalClear(region)).isTrue(); - - // Confirm that cmnClearRegion was called - verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - } - - @Test - public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() { - InternalDistributedMember sender = mock(InternalDistributedMember.class); - - Set<InternalDistributedMember> recipients = new HashSet<>(); - recipients.add(sender); - - ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class); - int mockProcessorId = 5; - when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId); - - message.initMessage(region, recipients, mockProcessor); - - verify(mockProcessor, times(1)).enableSevereAlertProcessing(); - assertThat(message.getProcessorId()).isEqualTo(mockProcessorId); - } - - @Test - public void initMessageSetsProcessorIdToZeroWithNullProcessor() { - message.initMessage(region, null, null); - - assertThat(message.getProcessorId()).isEqualTo(0); - } - - @Test - public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() { - InternalDistributedMember recipient = mock(InternalDistributedMember.class); - - DistributionManager distributionManager = mock(DistributionManager.class); - when(region.getDistributionManager()).thenReturn(distributionManager); - - doNothing().when(message).initMessage(any(), any(), any()); - Set<InternalDistributedMember> failures = new HashSet<>(); - failures.add(recipient); - - when(distributionManager.putOutgoing(message)).thenReturn(failures); - - assertThatThrownBy(() -> message.send(recipient, region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining("Failed sending <" + message + ">"); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds() - throws ForceReattemptException { - ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); - InternalDistributedMember sender = mock(InternalDistributedMember.class); - int processorId = 1000; - int startTime = 0; - - doReturn(0).when(message).getBucketId(); - doReturn(true).when(message).doLocalClear(region); - doReturn(sender).when(message).getSender(); - doReturn(processorId).when(message).getProcessorId(); - - // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to - // do nothing and verify later that it was called with proper input - doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong()); - - message.operateOnPartitionedRegion(distributionManager, region, startTime); - assertThat(message.result).isTrue(); - - verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region, - startTime); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException() - throws ForceReattemptException { - ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); - InternalDistributedMember sender = mock(InternalDistributedMember.class); - int processorId = 1000; - int startTime = 0; - ForceReattemptException exception = - new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - - doReturn(0).when(message).getBucketId(); - doThrow(exception).when(message).doLocalClear(region); - doReturn(sender).when(message).getSender(); - doReturn(processorId).when(message).getProcessorId(); - - // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to - // do nothing and verify later that it was called with proper input - doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong()); - - message.operateOnPartitionedRegion(distributionManager, region, startTime); - - verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong()); - } - - @Test - public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() { - DistributionManager distributionManager = mock(DistributionManager.class); - InternalDistributedMember recipient = mock(InternalDistributedMember.class); - PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class); - when(region.getPrStats()).thenReturn(partitionedRegionStats); - - int processorId = 1000; - int startTime = 10000; - ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - - ReplySender replySender = mock(ReplySender.class); - doReturn(replySender).when(message).getReplySender(distributionManager); - - message.sendReply(recipient, processorId, distributionManager, exception, region, startTime); - - verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime); - } - - @Test - public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() { - DistributionManager distributionManager = mock(DistributionManager.class); - InternalDistributedMember recipient = mock(InternalDistributedMember.class); - PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class); - when(region.getPrStats()).thenReturn(partitionedRegionStats); - - int processorId = 1000; - int startTime = 0; - ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - - ReplySender replySender = mock(ReplySender.class); - doReturn(replySender).when(message).getReplySender(distributionManager); - - message.sendReply(recipient, processorId, distributionManager, exception, region, startTime); - - verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime); - } - - @Test - public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() { - DistributionManager distributionManager = mock(DistributionManager.class); - DMStats mockStats = mock(DMStats.class); - when(distributionManager.getStats()).thenReturn(mockStats); - ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage(); - ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class); - - clearReplyMessage.process(distributionManager, mockProcessor); - - verify(mockProcessor, times(1)).setResponse(clearReplyMessage); - } -}
