This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch feature/GEODE-6186 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d0f12632e6fed40c3e6d9aa6c971087650e0501e Author: Barry Oglesby <[email protected]> AuthorDate: Tue Dec 11 12:35:05 2018 -0800 GEODE-6186: Reduced the number of EntryNotFoundExceptions thrown during wan conflation --- .../geode/internal/cache/BucketRegionQueue.java | 54 +++++---- .../wan/parallel/ParallelGatewaySenderQueue.java | 20 +++- .../internal/cache/BucketRegionQueueJUnitTest.java | 123 +++++++++++++++++++++ .../wan/parallel/ParallelGatewaySenderHelper.java | 84 ++++++++++++++ .../ParallelQueueRemovalMessageJUnitTest.java | 80 ++------------ 5 files changed, 267 insertions(+), 94 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index a5b67c1..9d95672 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -315,44 +315,56 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } // No need to synchronize because it is called from a synchronized method - private void removeIndex(Long qkey) { + protected boolean removeIndex(Long qkey) { // Determine whether conflation is enabled for this queue and object + boolean entryFound; Object o = getNoLRU(qkey, true, false, false); - if (o instanceof Conflatable) { - Conflatable object = (Conflatable) o; - if (object.shouldBeConflated()) { - // Otherwise, remove the index from the indexes map. - String rName = object.getRegionToConflate(); - Object key = object.getKeyToConflate(); - Map latestIndexesForRegion = (Map) this.indexes.get(rName); - if (latestIndexesForRegion != null) { - // Remove the index if appropriate. Verify the qKey is actually the one being referenced - // in the index. If it isn't, then another event has been received for the real key. In - // that case, don't remove the index since it has already been overwritten. - if (latestIndexesForRegion.get(key) == qkey) { - Long index = (Long) latestIndexesForRegion.remove(key); - if (index != null) { - this.getPartitionedRegion().getParallelGatewaySender().getStatistics() - .decConflationIndexesMapSize(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Removed index {} for {}", this, index, object); + if (o == null) { + entryFound = false; + } else { + entryFound = true; + if (o instanceof Conflatable) { + Conflatable object = (Conflatable) o; + if (object.shouldBeConflated()) { + // Otherwise, remove the index from the indexes map. + String rName = object.getRegionToConflate(); + Object key = object.getKeyToConflate(); + Map latestIndexesForRegion = (Map) this.indexes.get(rName); + if (latestIndexesForRegion != null) { + // Remove the index if appropriate. Verify the qKey is actually the one being referenced + // in the index. If it isn't, then another event has been received for the real key. In + // that case, don't remove the index since it has already been overwritten. + if (latestIndexesForRegion.get(key) == qkey) { + Long index = (Long) latestIndexesForRegion.remove(key); + if (index != null) { + this.getPartitionedRegion().getParallelGatewaySender().getStatistics() + .decConflationIndexesMapSize(); + if (logger.isDebugEnabled()) { + logger.debug("{}: Removed index {} for {}", this, index, object); + } } } } } } } + return entryFound; } @Override public void basicDestroy(final EntryEventImpl event, final boolean cacheWrite, Object expectedOldValue) throws EntryNotFoundException, CacheWriterException, TimeoutException { + boolean indexEntryFound = true; if (getPartitionedRegion().isConflationEnabled()) { - removeIndex((Long) event.getKey()); + indexEntryFound = containsKey(event.getKey()) && removeIndex((Long) event.getKey()); } try { - super.basicDestroy(event, cacheWrite, expectedOldValue); + if (indexEntryFound) { + super.basicDestroy(event, cacheWrite, expectedOldValue); + } else { + throw new EntryNotFoundException(event.getKey().toString()); + } } finally { GatewaySenderEventImpl.release(event.getRawOldValue()); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index a4c1eb5..bcf2f04 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1033,7 +1033,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) { - boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); // TODO : Make sure we dont need to initalize a bucket // before destroying a key from it @@ -1272,7 +1271,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } // Sleep a bit before trying again. try { - Thread.sleep(50); + Thread.sleep(getTimeToSleep(end - currentTime)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -1290,6 +1289,23 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return batch; } + private long getTimeToSleep(long timeToWait) { + // Get the minimum of 50 and 5% of the time to wait (which by default is 1000 ms) + long timeToSleep = Math.min(50l, ((long) (timeToWait * 0.05))); + + // If it is 0, then try 50% of the time to wait + if (timeToSleep == 0) { + timeToSleep = (long) (timeToWait * 0.50); + } + + // If it is still 0, use the time to wait + if (timeToSleep == 0) { + timeToSleep = timeToWait; + } + + return timeToSleep; + } + private void addPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) { if (this.resetLastPeeked) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java new file mode 100644 index 0000000..db74bec --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper; +import org.apache.geode.test.fake.Fakes; + +public class BucketRegionQueueJUnitTest { + + private static final String GATEWAY_SENDER_ID = "ny"; + private static final int BUCKET_ID = 85; + private static final long KEY = 198; + + private GemFireCacheImpl cache; + private PartitionedRegion queueRegion; + private AbstractGatewaySender sender; + private PartitionedRegion rootRegion; + private BucketRegionQueue bucketRegionQueue; + + @Before + public void setUpGemFire() { + createCache(); + createQueueRegion(); + createGatewaySender(); + createRootRegion(); + createBucketRegionQueue(); + } + + private void createCache() { + // Mock cache + this.cache = Fakes.cache(); + } + + private void createQueueRegion() { + // Mock queue region + this.queueRegion = + ParallelGatewaySenderHelper.createMockQueueRegion(this.cache, + ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID)); + } + + private void createGatewaySender() { + // Mock gateway sender + this.sender = ParallelGatewaySenderHelper.createGatewaySender(this.cache); + when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender); + } + + private void createRootRegion() { + // Mock root region + this.rootRegion = mock(PartitionedRegion.class); + when(this.rootRegion.getFullPath()) + .thenReturn(Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME); + } + + private void createBucketRegionQueue() { + BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper + .createBucketRegionQueue(this.cache, this.rootRegion, this.queueRegion, BUCKET_ID); + this.bucketRegionQueue = spy(realBucketRegionQueue); + } + + @Test + public void testBasicDestroyConflationEnabledAndValueInRegionAndIndex() { + // Create the event + EntryEventImpl event = EntryEventImpl.create(this.bucketRegionQueue, Operation.DESTROY, + KEY, "value", null, false, mock(DistributedMember.class)); + + // Don't allow hasSeenEvent to be invoked + doReturn(false).when(this.bucketRegionQueue).hasSeenEvent(event); + + // Set conflation enabled and the appropriate return values for containsKey and removeIndex + when(this.queueRegion.isConflationEnabled()).thenReturn(true); + when(this.bucketRegionQueue.containsKey(KEY)).thenReturn(true); + doReturn(true).when(this.bucketRegionQueue).removeIndex(KEY); + + // Invoke basicDestroy + this.bucketRegionQueue.basicDestroy(event, true, null); + + // Verify mapDestroy is invoked + verify(this.bucketRegionQueue).mapDestroy(event, true, false, null); + } + + @Test(expected = EntryNotFoundException.class) + public void testBasicDestroyConflationEnabledAndValueNotInRegion() { + // Create the event + EntryEventImpl event = EntryEventImpl.create(this.bucketRegionQueue, Operation.DESTROY, + KEY, "value", null, false, mock(DistributedMember.class)); + + // Don't allow hasSeenEvent to be invoked + doReturn(false).when(this.bucketRegionQueue).hasSeenEvent(event); + + // Set conflation enabled and the appropriate return values for containsKey and removeIndex + when(this.queueRegion.isConflationEnabled()).thenReturn(true); + when(this.bucketRegionQueue.containsKey(KEY)).thenReturn(false); + + // Invoke basicDestroy + this.bucketRegionQueue.basicDestroy(event, true, null); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java index a5372db..53529af 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java @@ -14,21 +14,45 @@ */ package org.apache.geode.internal.cache.wan.parallel; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EvictionAction; +import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.Operation; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.Scope; +import org.apache.geode.internal.cache.BucketAdvisor; +import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.EvictionAttributesImpl; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.KeyInfo; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionDataStore; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.cache.PartitionedRegionStats; +import org.apache.geode.internal.cache.ProxyBucketRegion; import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.eviction.AbstractEvictionController; +import org.apache.geode.internal.cache.eviction.EvictionController; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; @@ -66,6 +90,66 @@ public class ParallelGatewaySenderHelper { return gsei; } + public static PartitionedRegion createMockQueueRegion(GemFireCacheImpl cache, String regionName) { + // Mock queue region + PartitionedRegion queueRegion = mock(PartitionedRegion.class); + when(queueRegion.getFullPath()).thenReturn(regionName); + when(queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class)); + when(queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class)); + when(queueRegion.getCache()).thenReturn(cache); + EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes + .createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK); + EvictionController eviction = AbstractEvictionController.create(ea, false, + cache.getDistributedSystem(), "queueRegion"); + when(queueRegion.getEvictionController()).thenReturn(eviction); + return queueRegion; + } + + public static BucketRegionQueue createBucketRegionQueue(GemFireCacheImpl cache, + PartitionedRegion parentRegion, PartitionedRegion queueRegion, int bucketId) { + // Create InternalRegionArguments + InternalRegionArguments ira = new InternalRegionArguments(); + ira.setPartitionedRegion(queueRegion); + ira.setPartitionedRegionBucketRedundancy(1); + BucketAdvisor ba = mock(BucketAdvisor.class); + ira.setBucketAdvisor(ba); + InternalRegionArguments pbrIra = new InternalRegionArguments(); + RegionAdvisor ra = mock(RegionAdvisor.class); + when(ra.getPartitionedRegion()).thenReturn(queueRegion); + pbrIra.setPartitionedRegionAdvisor(ra); + PartitionAttributes pa = mock(PartitionAttributes.class); + when(queueRegion.getPartitionAttributes()).thenReturn(pa); + + when(queueRegion.getBucketName(eq(bucketId))).thenAnswer(new Answer<String>() { + @Override + public String answer(final InvocationOnMock invocation) throws Throwable { + return PartitionedRegionHelper.getBucketName(queueRegion.getFullPath(), bucketId); + } + }); + + when(queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION); + + when(pa.getColocatedWith()).thenReturn(null); + + when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class)); + + // Create RegionAttributes + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setDataPolicy(DataPolicy.REPLICATE); + factory.setEvictionAttributes( + EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK)); + RegionAttributes attributes = factory.create(); + + // Create BucketRegionQueue + return new BucketRegionQueue( + queueRegion.getBucketName(bucketId), attributes, parentRegion, cache, ira); + } + + public static String getRegionQueueName(String gatewaySenderId) { + return Region.SEPARATOR + gatewaySenderId + ParallelGatewaySenderQueue.QSTRING; + } + private static EnumListenerEvent getEnumListenerEvent(Operation operation) { EnumListenerEvent ele = null; if (operation.isCreate()) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java index 6a5b495..cbb6551 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -36,39 +35,21 @@ import java.util.concurrent.LinkedBlockingQueue; import org.junit.Before; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.EntryNotFoundException; -import org.apache.geode.cache.EvictionAction; -import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.Operation; -import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionAttributes; -import org.apache.geode.cache.Scope; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.internal.cache.AbstractBucketRegionQueue; -import org.apache.geode.internal.cache.BucketAdvisor; import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.BucketRegionQueueHelper; import org.apache.geode.internal.cache.EntryEventImpl; -import org.apache.geode.internal.cache.EvictionAttributesImpl; import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.KeyInfo; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.PartitionedRegionDataStore; import org.apache.geode.internal.cache.PartitionedRegionHelper; -import org.apache.geode.internal.cache.PartitionedRegionStats; -import org.apache.geode.internal.cache.ProxyBucketRegion; -import org.apache.geode.internal.cache.eviction.AbstractEvictionController; -import org.apache.geode.internal.cache.eviction.EvictionController; -import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; @@ -105,16 +86,9 @@ public class ParallelQueueRemovalMessageJUnitTest { private void createQueueRegion() { // Mock queue region - this.queueRegion = mock(PartitionedRegion.class); - when(this.queueRegion.getFullPath()).thenReturn(getRegionQueueName()); - when(this.queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class)); - when(this.queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class)); - when(this.queueRegion.getCache()).thenReturn(this.cache); - EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes - .createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK); - EvictionController eviction = AbstractEvictionController.create(ea, false, - this.cache.getDistributedSystem(), "queueRegion"); - when(this.queueRegion.getEvictionController()).thenReturn(eviction); + this.queueRegion = + ParallelGatewaySenderHelper.createMockQueueRegion(this.cache, + ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID)); } private void createGatewaySender() { @@ -134,47 +108,14 @@ public class ParallelQueueRemovalMessageJUnitTest { .thenReturn(Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME); when(this.cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)) .thenReturn(this.rootRegion); - when(this.cache.getRegion(getRegionQueueName())).thenReturn(this.queueRegion); + when(this.cache.getRegion(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID))) + .thenReturn(this.queueRegion); } private void createBucketRegionQueue() { - // Create InternalRegionArguments - InternalRegionArguments ira = new InternalRegionArguments(); - ira.setPartitionedRegion(this.queueRegion); - ira.setPartitionedRegionBucketRedundancy(1); - BucketAdvisor ba = mock(BucketAdvisor.class); - ira.setBucketAdvisor(ba); - InternalRegionArguments pbrIra = new InternalRegionArguments(); - RegionAdvisor ra = mock(RegionAdvisor.class); - when(ra.getPartitionedRegion()).thenReturn(this.queueRegion); - pbrIra.setPartitionedRegionAdvisor(ra); - PartitionAttributes pa = mock(PartitionAttributes.class); - when(this.queueRegion.getPartitionAttributes()).thenReturn(pa); - - when(this.queueRegion.getBucketName(eq(BUCKET_ID))).thenAnswer(new Answer<String>() { - @Override - public String answer(final InvocationOnMock invocation) throws Throwable { - return PartitionedRegionHelper.getBucketName(queueRegion.getFullPath(), BUCKET_ID); - } - }); - - when(this.queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION); - - when(pa.getColocatedWith()).thenReturn(null); - - when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class)); - - // Create RegionAttributes - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(DataPolicy.REPLICATE); - factory.setEvictionAttributes( - EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK)); - RegionAttributes attributes = factory.create(); - // Create BucketRegionQueue - BucketRegionQueue realBucketRegionQueue = new BucketRegionQueue( - this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira); + BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper + .createBucketRegionQueue(this.cache, this.rootRegion, this.queueRegion, BUCKET_ID); this.bucketRegionQueue = spy(realBucketRegionQueue); // (this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira); EntryEventImpl entryEvent = EntryEventImpl.create(this.bucketRegionQueue, Operation.DESTROY, @@ -312,7 +253,8 @@ public class ParallelQueueRemovalMessageJUnitTest { List<Long> dispatchedKeys = new ArrayList<>(); dispatchedKeys.add(KEY); bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys); - regionToDispatchedKeys.put(getRegionQueueName(), bucketIdToDispatchedKeys); + regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID), + bucketIdToDispatchedKeys); return regionToDispatchedKeys; } @@ -327,8 +269,4 @@ public class ParallelQueueRemovalMessageJUnitTest { tempQueueMap.put(BUCKET_ID, tempQueue); return tempQueue; } - - private String getRegionQueueName() { - return Region.SEPARATOR + GATEWAY_SENDER_ID + ParallelGatewaySenderQueue.QSTRING; - } }
