This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7682 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 0dca40c48f73f05a2fd732d298cb1820ca38d888 Author: zhouxh <[email protected]> AuthorDate: Fri Feb 14 17:44:47 2020 -0800 GEODE-7682: add clear API for PR --- .../geode/internal/cache/DistributedRegion.java | 9 - .../apache/geode/internal/cache/LocalRegion.java | 10 + .../geode/internal/cache/PartitionedRegion.java | 206 +++++++++++++++++++-- .../internal/cache/partitioned/ClearPRMessage.java | 6 +- .../cache/partitioned/ClearPRMessageTest.java | 14 +- 5 files changed, 212 insertions(+), 33 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 1a62919..900d85e 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute @MutableForTesting public static boolean ignoreReconnect = false; - /** - * Lock to prevent multiple threads on this member from performing a clear at the same time. - */ - private final Object clearLock = new Object(); private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true); @MakeNotStatic @@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute } } - private void lockCheckReadiness() { - cache.getCancelCriterion().checkCancelInProgress(null); - checkReadiness(); - } - @Override Object validatedDestroy(Object key, EntryEventImpl event) throws TimeoutException, EntryNotFoundException, CacheWriterException { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 6a7e2d2..d5f9156 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, private final Lock clientMetaDataLock = new ReentrantLock(); /** + * Lock to prevent multiple threads on this member from performing a clear at the same time. + */ + protected final Object clearLock = new Object(); + + /** * Lock for updating the cache service profile for the region. */ private final Lock cacheServiceProfileUpdateLock = new ReentrantLock(); @@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, checkRegionDestroyed(true); } + protected void lockCheckReadiness() { + cache.getCancelCriterion().checkCancelInProgress(null); + checkReadiness(); + } + /** * This method should be called when the caller cannot locate an entry and that condition is * unexpected. This will first double check the cache and region state before throwing an diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 2c1ec04..119d2ae 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -2144,18 +2145,194 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - /** - * @since GemFire 5.0 - * @throws UnsupportedOperationException OVERRIDES - */ @Override - public void clear() { - throw new UnsupportedOperationException(); + void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { + synchronized (clearLock) { + final DistributedLockService lockService = getPartitionedRegionLockService(); + try { + lockService.lock("_clearOperation", -1, -1); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + try { + // create ClearPRMessage per bucket + HashMap prClearMsgMap = createClearPRMessages(); + Iterator itor = prClearMsgMap.entrySet().iterator(); + while (itor.hasNext()) { + Map.Entry mapEntry = (Map.Entry) itor.next(); + Integer bucketId = (Integer) mapEntry.getKey(); + ClearPRMessage clearPRMessage = (ClearPRMessage)mapEntry.getValue(); + checkReadiness(); + try { +// clearPRMessage.send() + } + } + } finally { + try { + lockService.unlock("_clearOperation"); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + } + + // notify bridge clients at PR level + notifyBridgeClients(regionEvent); + } } - @Override - void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { - throw new UnsupportedOperationException(); + void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) { + RetryTimeKeeper retryTime = null; + InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId, currentTarget); + } + + long timeOut = 0; + int count = 0; + for (;;) { + switch (count) { + case 0: + // Note we don't check for DM cancellation in common case. + // First time. Assume success, keep going. + break; + case 1: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // Second time (first failure). Calculate timeout and keep going. + timeOut = System.currentTimeMillis() + this.retryTimeout; + break; + default: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // test for timeout + long timeLeft = timeOut - System.currentTimeMillis(); + if (timeLeft < 0) { + PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId, + this.retryTimeout); + // NOTREACHED + } + + // Didn't time out. Sleep a bit and then continue + boolean interrupted = Thread.interrupted(); + try { + Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); + } catch (InterruptedException ignore) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + break; + } // switch + count++; + + if (currentTarget == null) { // pick target + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, false); + if (currentTarget == null) { + // the bucket does not exist, no need to clear + logger.info("Bucket "+bucketId+" does not contain data, no need to clear"); + return; + } else { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget); + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + checkShutdown(); + continue; + } // pick target + + try { + final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId()); + if (isLocal) { + clearPRMessage.doLocalClear(this, bucketId); + } else { + ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this); + if (response != null) { + this.prStats.incPartitionMessagesSent(); + boolean result = response.waitForResult(); + } + } + } catch (ForceReattemptException prce) { + checkReadiness(); + InternalDistributedMember lastTarget = currentTarget; + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget, + currentTarget); + } + if (lastTarget.equals(currentTarget)) { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}", + currentTarget, prce.getMessage()); + } + if (retryTime.overMaximum()) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", + this.retryTimeout); + // NOTREACHED + } + retryTime.waitToRetryNode(); + } + event.setPossibleDuplicate(true); + if (prMsg != null) { + prMsg.setPossibleDuplicate(true); + } + } catch (PrimaryBucketException notPrimary) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket {} on Node {} not primnary", notPrimary.getLocalizedMessage(), + currentTarget); + } + getRegionAdvisor().notPrimary(bucketId, currentTarget); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + } catch (DataLocationException dle) { + if (logger.isDebugEnabled()) { + logger.debug("DataLocationException processing putAll", dle); + } + throw new TransactionException(dle); + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw + // exception + checkShutdown(); + + // If we get here, the attempt failed... + if (count == 1) { + this.prStats.incPutAllMsgsRetried(); + } + this.prStats.incPutAllRetries(); + } // for + // NOTREACHED + } + + HashMap createClearPRMessages() { + if (cache.isCacheAtShutdownAll()) { + throw cache.getCacheClosedException("Cache is shutting down"); + } + + HashMap prClearMsgMap = new HashMap(); + for (int bucketId=0; bucketId<this.totalNumberOfBuckets; bucketId++) { + ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId); + prClearMsgMap.put(bucketId, clearPRMessage); + } + return prClearMsgMap; } @Override @@ -2574,7 +2751,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (isDebugEnabled) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2715,7 +2892,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (logger.isDebugEnabled()) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2960,7 +3137,7 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); // It's possible this is a GemFire thread e.g. ServerConnection // which got to this point because of a distributed system shutdown or @@ -3119,10 +3296,11 @@ public class PartitionedRegion extends LocalRegion * @param retryTime the RetryTimeKeeper to track retry times * @param event the event used to get the entry size in the event a new bucket should be created * @param bucketId the identity of the bucket should it be created + * @param createIfNotExist * @return a Node which contains the bucket, potentially null */ private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime, - EntryEventImpl event, Integer bucketId) { + EntryEventImpl event, Integer bucketId, boolean createIfNotExist) { InternalDistributedMember newNode; if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket", @@ -3132,7 +3310,7 @@ public class PartitionedRegion extends LocalRegion retryTime.waitForBucketsRecovery(); newNode = getNodeForBucketWrite(bucketId, retryTime); - if (newNode == null) { + if (newNode == null && createIfNotExist) { newNode = createBucket(bucketId, getEntrySize(event), retryTime); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java index 1a8aba1..44166c3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java @@ -169,7 +169,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, PartitionedRegion region, long startTime) { try { - result = doLocalClear(region); + result = doLocalClear(region, bucketId); } catch (ForceReattemptException ex) { sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region, startTime); @@ -179,9 +179,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { return false; } - public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException { + public boolean doLocalClear(PartitionedRegion region, int bucketId) throws ForceReattemptException { // Retrieve local bucket region which matches target bucketId - BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId); + BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, this.bucketId); // Check if we are primary, throw exception if not if (!bucketRegion.isPrimary()) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java index 2cf5231..d23edf2 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java @@ -72,7 +72,7 @@ public class ClearPRMessageTest { public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() { when(bucketRegion.isPrimary()).thenReturn(false); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); } @@ -85,7 +85,7 @@ public class ClearPRMessageTest { when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false); when(bucketRegion.isPrimary()).thenReturn(true); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); } @@ -99,7 +99,7 @@ public class ClearPRMessageTest { when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false); when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); // Confirm that we actually obtained and released the lock @@ -118,7 +118,7 @@ public class ClearPRMessageTest { when(bucketRegion.isPrimary()).thenReturn(true); when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, bucketRegion.getId())) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION); @@ -136,7 +136,7 @@ public class ClearPRMessageTest { // Be primary on the first check, then be not primary on the second check when(bucketRegion.isPrimary()).thenReturn(true); when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - assertThat(message.doLocalClear(region)).isTrue(); + assertThat(message.doLocalClear(region, 0)).isTrue(); // Confirm that cmnClearRegion was called verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); @@ -197,7 +197,7 @@ public class ClearPRMessageTest { int processorId = 1000; int startTime = 0; - doReturn(true).when(message).doLocalClear(region); + doReturn(true).when(message).doLocalClear(region, 0); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId(); @@ -222,7 +222,7 @@ public class ClearPRMessageTest { ForceReattemptException exception = new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - doThrow(exception).when(message).doLocalClear(region); + doThrow(exception).when(message).doLocalClear(region, 0); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId();
