This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-6186
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d0f12632e6fed40c3e6d9aa6c971087650e0501e
Author: Barry Oglesby <[email protected]>
AuthorDate: Tue Dec 11 12:35:05 2018 -0800

    GEODE-6186: Reduced the number of EntryNotFoundExceptions thrown during wan 
conflation
---
 .../geode/internal/cache/BucketRegionQueue.java    |  54 +++++----
 .../wan/parallel/ParallelGatewaySenderQueue.java   |  20 +++-
 .../internal/cache/BucketRegionQueueJUnitTest.java | 123 +++++++++++++++++++++
 .../wan/parallel/ParallelGatewaySenderHelper.java  |  84 ++++++++++++++
 .../ParallelQueueRemovalMessageJUnitTest.java      |  80 ++------------
 5 files changed, 267 insertions(+), 94 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index a5b67c1..9d95672 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -315,44 +315,56 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
   }
 
   // No need to synchronize because it is called from a synchronized method
-  private void removeIndex(Long qkey) {
+  protected boolean removeIndex(Long qkey) {
     // Determine whether conflation is enabled for this queue and object
+    boolean entryFound;
     Object o = getNoLRU(qkey, true, false, false);
-    if (o instanceof Conflatable) {
-      Conflatable object = (Conflatable) o;
-      if (object.shouldBeConflated()) {
-        // Otherwise, remove the index from the indexes map.
-        String rName = object.getRegionToConflate();
-        Object key = object.getKeyToConflate();
-        Map latestIndexesForRegion = (Map) this.indexes.get(rName);
-        if (latestIndexesForRegion != null) {
-          // Remove the index if appropriate. Verify the qKey is actually the 
one being referenced
-          // in the index. If it isn't, then another event has been received 
for the real key. In
-          // that case, don't remove the index since it has already been 
overwritten.
-          if (latestIndexesForRegion.get(key) == qkey) {
-            Long index = (Long) latestIndexesForRegion.remove(key);
-            if (index != null) {
-              
this.getPartitionedRegion().getParallelGatewaySender().getStatistics()
-                  .decConflationIndexesMapSize();
-              if (logger.isDebugEnabled()) {
-                logger.debug("{}: Removed index {} for {}", this, index, 
object);
+    if (o == null) {
+      entryFound = false;
+    } else {
+      entryFound = true;
+      if (o instanceof Conflatable) {
+        Conflatable object = (Conflatable) o;
+        if (object.shouldBeConflated()) {
+          // Otherwise, remove the index from the indexes map.
+          String rName = object.getRegionToConflate();
+          Object key = object.getKeyToConflate();
+          Map latestIndexesForRegion = (Map) this.indexes.get(rName);
+          if (latestIndexesForRegion != null) {
+            // Remove the index if appropriate. Verify the qKey is actually 
the one being referenced
+            // in the index. If it isn't, then another event has been received 
for the real key. In
+            // that case, don't remove the index since it has already been 
overwritten.
+            if (latestIndexesForRegion.get(key) == qkey) {
+              Long index = (Long) latestIndexesForRegion.remove(key);
+              if (index != null) {
+                
this.getPartitionedRegion().getParallelGatewaySender().getStatistics()
+                    .decConflationIndexesMapSize();
+                if (logger.isDebugEnabled()) {
+                  logger.debug("{}: Removed index {} for {}", this, index, 
object);
+                }
               }
             }
           }
         }
       }
     }
+    return entryFound;
   }
 
   @Override
   public void basicDestroy(final EntryEventImpl event, final boolean 
cacheWrite,
       Object expectedOldValue)
       throws EntryNotFoundException, CacheWriterException, TimeoutException {
+    boolean indexEntryFound = true;
     if (getPartitionedRegion().isConflationEnabled()) {
-      removeIndex((Long) event.getKey());
+      indexEntryFound = containsKey(event.getKey()) && removeIndex((Long) 
event.getKey());
     }
     try {
-      super.basicDestroy(event, cacheWrite, expectedOldValue);
+      if (indexEntryFound) {
+        super.basicDestroy(event, cacheWrite, expectedOldValue);
+      } else {
+        throw new EntryNotFoundException(event.getKey().toString());
+      }
     } finally {
       GatewaySenderEventImpl.release(event.getRawOldValue());
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index a4c1eb5..bcf2f04 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1033,7 +1033,6 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   }
 
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, 
Object key) {
-    boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
     BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
     // TODO : Make sure we dont need to initalize a bucket
     // before destroying a key from it
@@ -1272,7 +1271,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
         }
         // Sleep a bit before trying again.
         try {
-          Thread.sleep(50);
+          Thread.sleep(getTimeToSleep(end - currentTime));
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           break;
@@ -1290,6 +1289,23 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return batch;
   }
 
+  private long getTimeToSleep(long timeToWait) {
+    // Get the minimum of 50 and 5% of the time to wait (which by default is 
1000 ms)
+    long timeToSleep = Math.min(50l, ((long) (timeToWait * 0.05)));
+
+    // If it is 0, then try 50% of the time to wait
+    if (timeToSleep == 0) {
+      timeToSleep = (long) (timeToWait * 0.50);
+    }
+
+    // If it is still 0, use the time to wait
+    if (timeToSleep == 0) {
+      timeToSleep = timeToWait;
+    }
+
+    return timeToSleep;
+  }
+
   private void addPeekedEvents(List<GatewaySenderEventImpl> batch, int 
batchSize) {
     if (this.resetLastPeeked) {
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
new file mode 100644
index 0000000..db74bec
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
+import org.apache.geode.test.fake.Fakes;
+
+public class BucketRegionQueueJUnitTest {
+
+  private static final String GATEWAY_SENDER_ID = "ny";
+  private static final int BUCKET_ID = 85;
+  private static final long KEY = 198;
+
+  private GemFireCacheImpl cache;
+  private PartitionedRegion queueRegion;
+  private AbstractGatewaySender sender;
+  private PartitionedRegion rootRegion;
+  private BucketRegionQueue bucketRegionQueue;
+
+  @Before
+  public void setUpGemFire() {
+    createCache();
+    createQueueRegion();
+    createGatewaySender();
+    createRootRegion();
+    createBucketRegionQueue();
+  }
+
+  private void createCache() {
+    // Mock cache
+    this.cache = Fakes.cache();
+  }
+
+  private void createQueueRegion() {
+    // Mock queue region
+    this.queueRegion =
+        ParallelGatewaySenderHelper.createMockQueueRegion(this.cache,
+            ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID));
+  }
+
+  private void createGatewaySender() {
+    // Mock gateway sender
+    this.sender = ParallelGatewaySenderHelper.createGatewaySender(this.cache);
+    when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender);
+  }
+
+  private void createRootRegion() {
+    // Mock root region
+    this.rootRegion = mock(PartitionedRegion.class);
+    when(this.rootRegion.getFullPath())
+        .thenReturn(Region.SEPARATOR + 
PartitionedRegionHelper.PR_ROOT_REGION_NAME);
+  }
+
+  private void createBucketRegionQueue() {
+    BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper
+        .createBucketRegionQueue(this.cache, this.rootRegion, 
this.queueRegion, BUCKET_ID);
+    this.bucketRegionQueue = spy(realBucketRegionQueue);
+  }
+
+  @Test
+  public void testBasicDestroyConflationEnabledAndValueInRegionAndIndex() {
+    // Create the event
+    EntryEventImpl event = EntryEventImpl.create(this.bucketRegionQueue, 
Operation.DESTROY,
+        KEY, "value", null, false, mock(DistributedMember.class));
+
+    // Don't allow hasSeenEvent to be invoked
+    doReturn(false).when(this.bucketRegionQueue).hasSeenEvent(event);
+
+    // Set conflation enabled and the appropriate return values for 
containsKey and removeIndex
+    when(this.queueRegion.isConflationEnabled()).thenReturn(true);
+    when(this.bucketRegionQueue.containsKey(KEY)).thenReturn(true);
+    doReturn(true).when(this.bucketRegionQueue).removeIndex(KEY);
+
+    // Invoke basicDestroy
+    this.bucketRegionQueue.basicDestroy(event, true, null);
+
+    // Verify mapDestroy is invoked
+    verify(this.bucketRegionQueue).mapDestroy(event, true, false, null);
+  }
+
+  @Test(expected = EntryNotFoundException.class)
+  public void testBasicDestroyConflationEnabledAndValueNotInRegion() {
+    // Create the event
+    EntryEventImpl event = EntryEventImpl.create(this.bucketRegionQueue, 
Operation.DESTROY,
+        KEY, "value", null, false, mock(DistributedMember.class));
+
+    // Don't allow hasSeenEvent to be invoked
+    doReturn(false).when(this.bucketRegionQueue).hasSeenEvent(event);
+
+    // Set conflation enabled and the appropriate return values for 
containsKey and removeIndex
+    when(this.queueRegion.isConflationEnabled()).thenReturn(true);
+    when(this.bucketRegionQueue.containsKey(KEY)).thenReturn(false);
+
+    // Invoke basicDestroy
+    this.bucketRegionQueue.basicDestroy(event, true, null);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
index a5372db..53529af 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
@@ -14,21 +14,45 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.HashSet;
 import java.util.Set;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.internal.cache.BucketAdvisor;
+import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.EvictionAttributesImpl;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.KeyInfo;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.internal.cache.ProxyBucketRegion;
 import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.eviction.AbstractEvictionController;
+import org.apache.geode.internal.cache.eviction.EvictionController;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 
@@ -66,6 +90,66 @@ public class ParallelGatewaySenderHelper {
     return gsei;
   }
 
+  public static PartitionedRegion createMockQueueRegion(GemFireCacheImpl 
cache, String regionName) {
+    // Mock queue region
+    PartitionedRegion queueRegion = mock(PartitionedRegion.class);
+    when(queueRegion.getFullPath()).thenReturn(regionName);
+    
when(queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class));
+    
when(queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class));
+    when(queueRegion.getCache()).thenReturn(cache);
+    EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes
+        .createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK);
+    EvictionController eviction = AbstractEvictionController.create(ea, false,
+        cache.getDistributedSystem(), "queueRegion");
+    when(queueRegion.getEvictionController()).thenReturn(eviction);
+    return queueRegion;
+  }
+
+  public static BucketRegionQueue createBucketRegionQueue(GemFireCacheImpl 
cache,
+      PartitionedRegion parentRegion, PartitionedRegion queueRegion, int 
bucketId) {
+    // Create InternalRegionArguments
+    InternalRegionArguments ira = new InternalRegionArguments();
+    ira.setPartitionedRegion(queueRegion);
+    ira.setPartitionedRegionBucketRedundancy(1);
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    ira.setBucketAdvisor(ba);
+    InternalRegionArguments pbrIra = new InternalRegionArguments();
+    RegionAdvisor ra = mock(RegionAdvisor.class);
+    when(ra.getPartitionedRegion()).thenReturn(queueRegion);
+    pbrIra.setPartitionedRegionAdvisor(ra);
+    PartitionAttributes pa = mock(PartitionAttributes.class);
+    when(queueRegion.getPartitionAttributes()).thenReturn(pa);
+
+    when(queueRegion.getBucketName(eq(bucketId))).thenAnswer(new 
Answer<String>() {
+      @Override
+      public String answer(final InvocationOnMock invocation) throws Throwable 
{
+        return 
PartitionedRegionHelper.getBucketName(queueRegion.getFullPath(), bucketId);
+      }
+    });
+
+    when(queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+
+    when(pa.getColocatedWith()).thenReturn(null);
+
+    when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class));
+
+    // Create RegionAttributes
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+    factory.setEvictionAttributes(
+        EvictionAttributes.createLRUMemoryAttributes(100, null, 
EvictionAction.OVERFLOW_TO_DISK));
+    RegionAttributes attributes = factory.create();
+
+    // Create BucketRegionQueue
+    return new BucketRegionQueue(
+        queueRegion.getBucketName(bucketId), attributes, parentRegion, cache, 
ira);
+  }
+
+  public static String getRegionQueueName(String gatewaySenderId) {
+    return Region.SEPARATOR + gatewaySenderId + 
ParallelGatewaySenderQueue.QSTRING;
+  }
+
   private static EnumListenerEvent getEnumListenerEvent(Operation operation) {
     EnumListenerEvent ele = null;
     if (operation.isCreate()) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 6a5b495..cbb6551 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -20,7 +20,6 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -36,39 +35,21 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryNotFoundException;
-import org.apache.geode.cache.EvictionAction;
-import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
-import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
-import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.BucketRegionQueueHelper;
 import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.EvictionAttributesImpl;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.KeyInfo;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
-import org.apache.geode.internal.cache.PartitionedRegionStats;
-import org.apache.geode.internal.cache.ProxyBucketRegion;
-import org.apache.geode.internal.cache.eviction.AbstractEvictionController;
-import org.apache.geode.internal.cache.eviction.EvictionController;
-import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
@@ -105,16 +86,9 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
   private void createQueueRegion() {
     // Mock queue region
-    this.queueRegion = mock(PartitionedRegion.class);
-    when(this.queueRegion.getFullPath()).thenReturn(getRegionQueueName());
-    
when(this.queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class));
-    
when(this.queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class));
-    when(this.queueRegion.getCache()).thenReturn(this.cache);
-    EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes
-        .createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK);
-    EvictionController eviction = AbstractEvictionController.create(ea, false,
-        this.cache.getDistributedSystem(), "queueRegion");
-    when(this.queueRegion.getEvictionController()).thenReturn(eviction);
+    this.queueRegion =
+        ParallelGatewaySenderHelper.createMockQueueRegion(this.cache,
+            ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID));
   }
 
   private void createGatewaySender() {
@@ -134,47 +108,14 @@ public class ParallelQueueRemovalMessageJUnitTest {
         .thenReturn(Region.SEPARATOR + 
PartitionedRegionHelper.PR_ROOT_REGION_NAME);
     when(this.cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, 
true))
         .thenReturn(this.rootRegion);
-    
when(this.cache.getRegion(getRegionQueueName())).thenReturn(this.queueRegion);
+    
when(this.cache.getRegion(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID)))
+        .thenReturn(this.queueRegion);
   }
 
   private void createBucketRegionQueue() {
-    // Create InternalRegionArguments
-    InternalRegionArguments ira = new InternalRegionArguments();
-    ira.setPartitionedRegion(this.queueRegion);
-    ira.setPartitionedRegionBucketRedundancy(1);
-    BucketAdvisor ba = mock(BucketAdvisor.class);
-    ira.setBucketAdvisor(ba);
-    InternalRegionArguments pbrIra = new InternalRegionArguments();
-    RegionAdvisor ra = mock(RegionAdvisor.class);
-    when(ra.getPartitionedRegion()).thenReturn(this.queueRegion);
-    pbrIra.setPartitionedRegionAdvisor(ra);
-    PartitionAttributes pa = mock(PartitionAttributes.class);
-    when(this.queueRegion.getPartitionAttributes()).thenReturn(pa);
-
-    when(this.queueRegion.getBucketName(eq(BUCKET_ID))).thenAnswer(new 
Answer<String>() {
-      @Override
-      public String answer(final InvocationOnMock invocation) throws Throwable 
{
-        return 
PartitionedRegionHelper.getBucketName(queueRegion.getFullPath(), BUCKET_ID);
-      }
-    });
-
-    when(this.queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
-
-    when(pa.getColocatedWith()).thenReturn(null);
-
-    when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class));
-
-    // Create RegionAttributes
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    factory.setDataPolicy(DataPolicy.REPLICATE);
-    factory.setEvictionAttributes(
-        EvictionAttributes.createLRUMemoryAttributes(100, null, 
EvictionAction.OVERFLOW_TO_DISK));
-    RegionAttributes attributes = factory.create();
-
     // Create BucketRegionQueue
-    BucketRegionQueue realBucketRegionQueue = new BucketRegionQueue(
-        this.queueRegion.getBucketName(BUCKET_ID), attributes, 
this.rootRegion, this.cache, ira);
+    BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper
+        .createBucketRegionQueue(this.cache, this.rootRegion, 
this.queueRegion, BUCKET_ID);
     this.bucketRegionQueue = spy(realBucketRegionQueue);
     // (this.queueRegion.getBucketName(BUCKET_ID), attributes, 
this.rootRegion, this.cache, ira);
     EntryEventImpl entryEvent = EntryEventImpl.create(this.bucketRegionQueue, 
Operation.DESTROY,
@@ -312,7 +253,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
     List<Long> dispatchedKeys = new ArrayList<>();
     dispatchedKeys.add(KEY);
     bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
-    regionToDispatchedKeys.put(getRegionQueueName(), bucketIdToDispatchedKeys);
+    
regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID),
+        bucketIdToDispatchedKeys);
     return regionToDispatchedKeys;
   }
 
@@ -327,8 +269,4 @@ public class ParallelQueueRemovalMessageJUnitTest {
     tempQueueMap.put(BUCKET_ID, tempQueue);
     return tempQueue;
   }
-
-  private String getRegionQueueName() {
-    return Region.SEPARATOR + GATEWAY_SENDER_ID + 
ParallelGatewaySenderQueue.QSTRING;
-  }
 }

Reply via email to