This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7682
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0dca40c48f73f05a2fd732d298cb1820ca38d888
Author: zhouxh <[email protected]>
AuthorDate: Fri Feb 14 17:44:47 2020 -0800

    GEODE-7682: add clear API for PR
---
 .../geode/internal/cache/DistributedRegion.java    |   9 -
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +
 .../geode/internal/cache/PartitionedRegion.java    | 206 +++++++++++++++++++--
 .../internal/cache/partitioned/ClearPRMessage.java |   6 +-
 .../cache/partitioned/ClearPRMessageTest.java      |  14 +-
 5 files changed, 212 insertions(+), 33 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 1a62919..900d85e 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
   @MutableForTesting
   public static boolean ignoreReconnect = false;
 
-  /**
-   * Lock to prevent multiple threads on this member from performing a clear 
at the same time.
-   */
-  private final Object clearLock = new Object();
   private final ReentrantReadWriteLock failedInitialImageLock = new 
ReentrantReadWriteLock(true);
 
   @MakeNotStatic
@@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
     }
   }
 
-  private void lockCheckReadiness() {
-    cache.getCancelCriterion().checkCancelInProgress(null);
-    checkReadiness();
-  }
-
   @Override
   Object validatedDestroy(Object key, EntryEventImpl event)
       throws TimeoutException, EntryNotFoundException, CacheWriterException {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6a7e2d2..d5f9156 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements 
LoaderHelperFactory,
   private final Lock clientMetaDataLock = new ReentrantLock();
 
   /**
+   * Lock to prevent multiple threads on this member from performing a clear 
at the same time.
+   */
+  protected final Object clearLock = new Object();
+
+  /**
    * Lock for updating the cache service profile for the region.
    */
   private final Lock cacheServiceProfileUpdateLock = new ReentrantLock();
@@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     checkRegionDestroyed(true);
   }
 
+  protected void lockCheckReadiness() {
+    cache.getCancelCriterion().checkCancelInProgress(null);
+    checkReadiness();
+  }
+
   /**
    * This method should be called when the caller cannot locate an entry and 
that condition is
    * unexpected. This will first double check the cache and region state 
before throwing an
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2c1ec04..119d2ae 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -2144,18 +2145,194 @@ public class PartitionedRegion extends LocalRegion
     throw new UnsupportedOperationException();
   }
 
-  /**
-   * @since GemFire 5.0
-   * @throws UnsupportedOperationException OVERRIDES
-   */
   @Override
-  public void clear() {
-    throw new UnsupportedOperationException();
+  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+    synchronized (clearLock) {
+      final DistributedLockService lockService = 
getPartitionedRegionLockService();
+      try {
+        lockService.lock("_clearOperation", -1, -1);
+      } catch (IllegalStateException e) {
+        lockCheckReadiness();
+      }
+      try {
+        // create ClearPRMessage per bucket
+        HashMap prClearMsgMap = createClearPRMessages();
+        Iterator itor = prClearMsgMap.entrySet().iterator();
+        while (itor.hasNext()) {
+          Map.Entry mapEntry = (Map.Entry) itor.next();
+          Integer bucketId = (Integer) mapEntry.getKey();
+          ClearPRMessage clearPRMessage = (ClearPRMessage)mapEntry.getValue();
+          checkReadiness();
+          try {
+//            clearPRMessage.send()
+          }
+        }
+      } finally {
+        try {
+          lockService.unlock("_clearOperation");
+        } catch (IllegalStateException e) {
+          lockCheckReadiness();
+        }
+      }
+
+      // notify bridge clients at PR level
+      notifyBridgeClients(regionEvent);
+    }
   }
 
-  @Override
-  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
-    throw new UnsupportedOperationException();
+  void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage 
clearPRMessage) {
+    RetryTimeKeeper retryTime = null;
+    InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, 
null);
+    if (logger.isDebugEnabled()) {
+      logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", 
bucketId, currentTarget);
+    }
+
+    long timeOut = 0;
+    int count = 0;
+    for (;;) {
+      switch (count) {
+        case 0:
+          // Note we don't check for DM cancellation in common case.
+          // First time. Assume success, keep going.
+          break;
+        case 1:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // Second time (first failure). Calculate timeout and keep going.
+          timeOut = System.currentTimeMillis() + this.retryTimeout;
+          break;
+        default:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // test for timeout
+          long timeLeft = timeOut - System.currentTimeMillis();
+          if (timeLeft < 0) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" 
+ bucketId,
+                    this.retryTimeout);
+            // NOTREACHED
+          }
+
+          // Didn't time out. Sleep a bit and then continue
+          boolean interrupted = Thread.interrupted();
+          try {
+            
Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
+          } catch (InterruptedException ignore) {
+            interrupted = true;
+          } finally {
+            if (interrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+          break;
+      } // switch
+      count++;
+
+      if (currentTarget == null) { // pick target
+        checkReadiness();
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+
+        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, 
false);
+        if (currentTarget == null) {
+          // the bucket does not exist, no need to clear
+          logger.info("Bucket "+bucketId+" does not contain data, no need to 
clear");
+          return;
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", 
currentTarget);
+          }
+        }
+
+        // It's possible this is a GemFire thread e.g. ServerConnection
+        // which got to this point because of a distributed system shutdown or
+        // region closure which uses interrupt to break any sleep() or wait() 
calls
+        // e.g. waitForPrimary or waitForBucketRecovery in which case throw 
exception
+        checkShutdown();
+        continue;
+      } // pick target
+
+      try {
+        final boolean isLocal = (this.localMaxMemory > 0) && 
currentTarget.equals(getMyId());
+        if (isLocal) {
+          clearPRMessage.doLocalClear(this, bucketId);
+        } else {
+          ClearPRMessage.ClearResponse response = 
clearPRMessage.send(currentTarget, this);
+          if (response != null) {
+            this.prStats.incPartitionMessagesSent();
+            boolean result = response.waitForResult();
+          }
+        }
+      } catch (ForceReattemptException prce) {
+        checkReadiness();
+        InternalDistributedMember lastTarget = currentTarget;
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+        if (logger.isDebugEnabled()) {
+          logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", 
lastTarget,
+                  currentTarget);
+        }
+        if (lastTarget.equals(currentTarget)) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Retrying at the same 
node:{} due to {}",
+                    currentTarget, prce.getMessage());
+          }
+          if (retryTime.overMaximum()) {
+            PRHARedundancyProvider.timedOut(this, null, null, "update an 
entry",
+                    this.retryTimeout);
+            // NOTREACHED
+          }
+          retryTime.waitToRetryNode();
+        }
+        event.setPossibleDuplicate(true);
+        if (prMsg != null) {
+          prMsg.setPossibleDuplicate(true);
+        }
+      } catch (PrimaryBucketException notPrimary) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Bucket {} on Node {} not primnary", 
notPrimary.getLocalizedMessage(),
+                  currentTarget);
+        }
+        getRegionAdvisor().notPrimary(bucketId, currentTarget);
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+      } catch (DataLocationException dle) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("DataLocationException processing putAll", dle);
+        }
+        throw new TransactionException(dle);
+      }
+
+      // It's possible this is a GemFire thread e.g. ServerConnection
+      // which got to this point because of a distributed system shutdown or
+      // region closure which uses interrupt to break any sleep() or wait()
+      // calls
+      // e.g. waitForPrimary or waitForBucketRecovery in which case throw
+      // exception
+      checkShutdown();
+
+      // If we get here, the attempt failed...
+      if (count == 1) {
+        this.prStats.incPutAllMsgsRetried();
+      }
+      this.prStats.incPutAllRetries();
+    } // for
+  // NOTREACHED
+  }
+
+  HashMap createClearPRMessages() {
+    if (cache.isCacheAtShutdownAll()) {
+      throw cache.getCacheClosedException("Cache is shutting down");
+    }
+
+    HashMap prClearMsgMap = new HashMap();
+    for (int bucketId=0; bucketId<this.totalNumberOfBuckets; bucketId++) {
+      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+      prClearMsgMap.put(bucketId, clearPRMessage);
+    }
+    return prClearMsgMap;
   }
 
   @Override
@@ -2574,7 +2751,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId, true);
           if (isDebugEnabled) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new 
currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2715,7 +2892,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId, true);
           if (logger.isDebugEnabled()) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new 
currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2960,7 +3137,7 @@ public class PartitionedRegion extends LocalRegion
         if (retryTime == null) {
           retryTime = new RetryTimeKeeper(this.retryTimeout);
         }
-        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, 
true);
 
         // It's possible this is a GemFire thread e.g. ServerConnection
         // which got to this point because of a distributed system shutdown or
@@ -3119,10 +3296,11 @@ public class PartitionedRegion extends LocalRegion
    * @param retryTime the RetryTimeKeeper to track retry times
    * @param event the event used to get the entry size in the event a new 
bucket should be created
    * @param bucketId the identity of the bucket should it be created
+   * @param createIfNotExist
    * @return a Node which contains the bucket, potentially null
    */
   private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper 
retryTime,
-      EntryEventImpl event, Integer bucketId) {
+                                                              EntryEventImpl 
event, Integer bucketId, boolean createIfNotExist) {
     InternalDistributedMember newNode;
     if (retryTime.overMaximum()) {
       PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket",
@@ -3132,7 +3310,7 @@ public class PartitionedRegion extends LocalRegion
 
     retryTime.waitForBucketsRecovery();
     newNode = getNodeForBucketWrite(bucketId, retryTime);
-    if (newNode == null) {
+    if (newNode == null && createIfNotExist) {
       newNode = createBucket(bucketId, getEntrySize(event), retryTime);
     }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 1a8aba1..44166c3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -169,7 +169,7 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
   protected boolean operateOnPartitionedRegion(ClusterDistributionManager 
distributionManager,
       PartitionedRegion region, long startTime) {
     try {
-      result = doLocalClear(region);
+      result = doLocalClear(region, bucketId);
     } catch (ForceReattemptException ex) {
       sendReply(getSender(), getProcessorId(), distributionManager, new 
ReplyException(ex), region,
           startTime);
@@ -179,9 +179,9 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
     return false;
   }
 
-  public boolean doLocalClear(PartitionedRegion region) throws 
ForceReattemptException {
+  public boolean doLocalClear(PartitionedRegion region, int bucketId) throws 
ForceReattemptException {
     // Retrieve local bucket region which matches target bucketId
-    BucketRegion bucketRegion = 
region.getDataStore().getInitializedBucketForId(null, bucketId);
+    BucketRegion bucketRegion = 
region.getDataStore().getInitializedBucketForId(null, this.bucketId);
 
     // Check if we are primary, throw exception if not
     if (!bucketRegion.isPrimary()) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
index 2cf5231..d23edf2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -72,7 +72,7 @@ public class ClearPRMessageTest {
   public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
     when(bucketRegion.isPrimary()).thenReturn(false);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
   }
@@ -85,7 +85,7 @@ public class ClearPRMessageTest {
     when(mockLockService.lock(anyString(), anyLong(), 
anyLong())).thenReturn(false);
     when(bucketRegion.isPrimary()).thenReturn(true);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         
.hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
   }
@@ -99,7 +99,7 @@ public class ClearPRMessageTest {
     when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
     when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
     // Confirm that we actually obtained and released the lock
@@ -118,7 +118,7 @@ public class ClearPRMessageTest {
     when(bucketRegion.isPrimary()).thenReturn(true);
     when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 
bucketRegion.getId()))
         .isInstanceOf(ForceReattemptException.class)
         
.hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
 
@@ -136,7 +136,7 @@ public class ClearPRMessageTest {
     // Be primary on the first check, then be not primary on the second check
     when(bucketRegion.isPrimary()).thenReturn(true);
     when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
-    assertThat(message.doLocalClear(region)).isTrue();
+    assertThat(message.doLocalClear(region, 0)).isTrue();
 
     // Confirm that cmnClearRegion was called
     verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), 
anyBoolean());
@@ -197,7 +197,7 @@ public class ClearPRMessageTest {
     int processorId = 1000;
     int startTime = 0;
 
-    doReturn(true).when(message).doLocalClear(region);
+    doReturn(true).when(message).doLocalClear(region, 0);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
 
@@ -222,7 +222,7 @@ public class ClearPRMessageTest {
     ForceReattemptException exception =
         new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
 
-    doThrow(exception).when(message).doLocalClear(region);
+    doThrow(exception).when(message).doLocalClear(region, 0);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
 

Reply via email to