[ 
https://issues.apache.org/jira/browse/GEODE-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288939#comment-16288939
 ] 

ASF GitHub Bot commented on GEODE-4062:
---------------------------------------

dineshpune2006 closed pull request #1139: Feature/GEODE-4062:clear method 
implementation for ParallelGatewaySender queue
URL: https://github.com/apache/geode/pull/1139
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 4fc940c94a..54029cfaee 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -26,13 +26,10 @@
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.DistributedRegion;
-import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
-import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
-import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.internal.size.SingleObjectSizer;
 
 /**
@@ -225,4 +222,16 @@ public long getNumEntriesOverflowOnDiskTestOnly() {
     return ((ParallelGatewaySenderQueue) (processors[0].getQueue()))
         .getNumEntriesOverflowOnDiskTestOnly();
   }
+
+  public void clearQueue() {
+    try {
+      this.sender.getLifeCycleLock().writeLock().lock();
+      for (int i = 0; i < processors.length; i++) {
+
+        ((ParallelGatewaySenderQueue) 
this.processors[i].getQueue()).clearQueue();
+      }
+    } finally {
+      this.sender.getLifeCycleLock().writeLock().unlock();
+    }
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 18883beb87..0505a9b5e6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -53,6 +53,7 @@
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -68,12 +69,14 @@
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.PrimaryBucketException;
+import org.apache.geode.internal.cache.ProxyBucketRegion;
 import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import 
org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
@@ -1781,4 +1784,46 @@ ParallelGatewaySenderQueueMetaRegion 
newMetataRegion(InternalCache cache, final
       return meta;
     }
   }
+
+  public void clearQueue() {
+
+    try {
+      this.sender.pause();
+      for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+        clearPartitionedRegion((PartitionedRegion) prQ);
+      }
+    } finally {
+      if (this.sender.isPaused())
+        this.sender.resume();
+
+    }
+  }
+
+  // clear the partition region
+  private void clearPartitionedRegion(PartitionedRegion partitionedRegion) {
+    LocalDataSet lds = (LocalDataSet) 
PartitionRegionHelper.getLocalPrimaryData(partitionedRegion);
+    Set<Integer> set = lds.getBucketSet(); // this returns bucket ids in the 
function context
+
+    for (Integer bucketId : set) {
+      Bucket bucket = partitionedRegion.getRegionAdvisor().getBucket(bucketId);
+      if ((bucket instanceof ProxyBucketRegion == false) && bucket instanceof 
BucketRegion) {
+        BucketRegion bucketRegion = (BucketRegion) bucket;
+        ((BucketRegionQueue) 
bucketRegion).getInitializationLock().readLock().lock();
+        clearBucketRegion(bucketRegion);
+        ((BucketRegionQueue) 
bucketRegion).getInitializationLock().readLock().unlock();
+      }
+    }
+  }
+
+  private void clearBucketRegion(BucketRegion bucketRegion) {
+    Set keySet = bucketRegion.keySet();
+    for (Iterator iterator = keySet.iterator(); iterator.hasNext();) {
+      Object key = iterator.next();
+      try {
+        ((BucketRegionQueue) bucketRegion).destroyKey(key);
+      } catch (Exception e) {
+        logger.warn("exception raised in clearBucketRegion" + e.getMessage());
+      }
+    }
+  }
 }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 0e98dea1d3..2e107b3361 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -271,6 +271,35 @@ public static void createLocator(int dsId, int port, 
Set<String> localLocatorsLi
     test.getSystem(props);
   }
 
+  public static void clearGatewaySender(String senderId) {
+
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender) s;
+        ConcurrentParallelGatewaySenderQueue r =
+            (ConcurrentParallelGatewaySenderQueue) sender.getQueue();
+        r.clearQueue();
+        break;
+      }
+    }
+  }
+
+  public static Boolean isSenderPaused(String senderId) {
+    Boolean isPaused = false;
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender) s;
+        isPaused = sender.isPaused();
+        break;
+      }
+    }
+    return isPaused;
+  }
+
   public static Integer createFirstLocatorWithDSId(int dsId) {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ClearParallelSenderQueueWhenReceiverNotStartedRegressionTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ClearParallelSenderQueueWhenReceiverNotStartedRegressionTest.java
new file mode 100644
index 0000000000..589dae78f4
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ClearParallelSenderQueueWhenReceiverNotStartedRegressionTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+/**
+ * DUnit for ClearParallelSenderQueueWhenReceiverNotStartedRegressionTest 
operations. GEODE-3937
+ */
+@Category(DistributedTest.class)
+public class ClearParallelSenderQueueWhenReceiverNotStartedRegressionTest 
extends WANTestBase {
+
+  @Test
+  public void testParallelSenderQueueEventsOverflow_NoDiskStoreSpecified() 
throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);// backup-vm is not started in this 
regression
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4);// backup-vm is not started in this 
regression
+
+    vm4.invoke(() -> WANTestBase.createSenderWithoutDiskStore("ln", 2, 10, 10, 
false, true));
+
+    vm4.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 
1, 100, isOffHeap()));
+
+    startSenderInVMs("ln", vm4);
+
+    vm4.invoke(() -> WANTestBase.pauseSender("ln"));
+
+    // give some time for the senders to pause
+    Awaitility.await().atMost(1, TimeUnit.MINUTES)
+        .until(() -> vm4.invoke(() -> WANTestBase.isSenderPaused("ln")));
+
+    vm2.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+
+    int numEventPuts = 50;
+    vm4.invoke(() -> WANTestBase.doHeavyPuts(getTestMethodName(), 
numEventPuts));
+
+    long numOvVm4 = (Long) vm4.invoke(() -> 
WANTestBase.getNumberOfEntriesOverflownToDisk("ln"));
+    long numMemVm4 = (Long) vm4.invoke(() -> 
WANTestBase.getNumberOfEntriesInVM("ln"));
+
+    assertEquals("Total number of entries  in VM4 is incorrect", (numEventPuts 
* 1),
+        (numOvVm4 + numMemVm4));
+
+    // clear the queue
+    vm4.invoke(() -> WANTestBase.clearGatewaySender("ln"));
+
+    vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+
+    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+
+      int numMemVm2 = vm2.invoke(() -> 
WANTestBase.getRegionSize(getTestMethodName()));
+
+      assertEquals("Total number of entries  in VM2 is incorrect", 0, 
numMemVm2);
+
+    });
+
+  }
+
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> clear method implementation for ParallelGatewaySender class
> -----------------------------------------------------------
>
>                 Key: GEODE-4062
>                 URL: https://issues.apache.org/jira/browse/GEODE-4062
>             Project: Geode
>          Issue Type: Bug
>          Components: wan
>            Reporter: dinesh ak
>
> clear function is missing for ParallelGatewaySender.
> ParallelGatewaySender clear functionality is missing to clear the queue.
> why need: when receiver stuck . sender having lots of backlog in queue .
> we need clear method implementation which clear this queue.
> future use : we will create a gfsh command to clear the queue using this call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to