GEODE-2027: ParallelQueueRemovalMessage processing removes events from the region and temp queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b10a171e Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b10a171e Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b10a171e Branch: refs/heads/feature/GEM-983 Commit: b10a171e11990a566ed42560a903668908131390 Parents: 87f2fb5 Author: Barry Oglesby <[email protected]> Authored: Fri Oct 21 12:40:41 2016 -0700 Committer: Barry Oglesby <[email protected]> Committed: Thu Oct 27 09:17:35 2016 -0700 ---------------------------------------------------------------------- .../cache/wan/AbstractGatewaySender.java | 2 +- .../parallel/ParallelQueueRemovalMessage.java | 22 +- .../internal/cache/BucketRegionQueueHelper.java | 58 +++++ .../ParallelQueueRemovalMessageJUnitTest.java | 256 +++++++++++++++++++ .../java/org/apache/geode/test/fake/Fakes.java | 5 +- 5 files changed, 326 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 33119bc..e1c9010 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -684,7 +684,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return null; } - final public Set<RegionQueue> getQueues() { + public Set<RegionQueue> getQueues() { if (this.eventProcessor != null) { if (!(this.eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor)) { Set<RegionQueue> queues = new HashSet<RegionQueue>(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index a363b5d..bad3d3c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -135,22 +135,14 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { afterAckForSecondary_EventInBucket(abstractSender, brq, key); destroyKeyFromBucketQueue(brq, key, region); isDestroyed = true; - } else { - // if BucketRegionQueue does not have the key, it - // should be in tempQueue - // remove it from there..defect #49196 - isDestroyed = - destroyFromTempQueue(brq.getPartitionedRegion(), (Integer) bId, key); - } - if (!isDestroyed) { - // event is neither destroyed from BucketRegionQueue nor from tempQueue - brq.addToFailedBatchRemovalMessageKeys(key); - if (isDebugEnabled) { - logger.debug( - "Event is neither destroyed from BucketRegionQueue not from tempQueue. Added to failedBatchRemovalMessageKeys: {}", - key); - } } + + // Even if BucketRegionQueue does not have the key, it could be in the tempQueue + // remove it from there..defect #49196 + destroyFromTempQueue(brq.getPartitionedRegion(), (Integer) bId, key); + + // Finally, add the key to the failed batch removal keys so that it is definitely removed from the bucket region queue + brq.addToFailedBatchRemovalMessageKeys(key); } finally { brq.getInitializationLock().readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java new file mode 100644 index 0000000..68b29c2 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.geode.internal.cache; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Helper class in the internal cache package to access protected BucketRegionQueue methods. + */ +public class BucketRegionQueueHelper { + + private BucketRegionQueue bucketRegionQueue; + + public BucketRegionQueueHelper(GemFireCacheImpl cache, PartitionedRegion queueRegion, BucketRegionQueue bucketRegionQueue) { + this.bucketRegionQueue = bucketRegionQueue; + initialize(cache, queueRegion); + } + + public GatewaySenderEventImpl addEvent(Object key) { + this.bucketRegionQueue.getEventTracker().setInitialized(); + this.bucketRegionQueue.entries.disableLruUpdateCallback(); + GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class); + this.bucketRegionQueue.entries.initialImagePut(key, 0, event, false, false, null, null, false); + this.bucketRegionQueue.entries.enableLruUpdateCallback(); + return event; + } + + public void cleanUpDestroyedTokensAndMarkGIIComplete() { + this.bucketRegionQueue.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII); + } + + public void initialize(GemFireCacheImpl cache, PartitionedRegion queueRegion) { + InternalDistributedMember member = cache.getMyId(); + when(queueRegion.getMyId()).thenReturn(member); + when(cache.getRegionByPath(this.bucketRegionQueue.getFullPath())).thenReturn(this.bucketRegionQueue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java new file mode 100644 index 0000000..cc9caaf --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.*; +import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.lru.LRUAlgorithm; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.test.fake.Fakes; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(UnitTest.class) +public class ParallelQueueRemovalMessageJUnitTest { + + private GemFireCacheImpl cache; + private PartitionedRegion queueRegion; + private AbstractGatewaySender sender; + private PartitionedRegion rootRegion; + private BucketRegionQueue bucketRegionQueue; + private BucketRegionQueueHelper bucketRegionQueueHelper; + + private static String GATEWAY_SENDER_ID = "ny"; + private static int BUCKET_ID = 85; + private static long KEY = 198l; + + @Before + public void setUpGemFire() { + createCache(); + createQueueRegion(); + createGatewaySender(); + createRootRegion(); + createBucketRegionQueue(); + } + + private void createCache() { + // Mock cache + this.cache = Fakes.cache(); + GemFireCacheImpl.setInstanceForTests(this.cache); + } + + private void createQueueRegion() { + // Mock queue region + this.queueRegion = mock(PartitionedRegion.class); + when(this.queueRegion.getFullPath()).thenReturn(getRegionQueueName()); + when(this.queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class)); + when(this.queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class)); + when(this.queueRegion.getCache()).thenReturn(this.cache); + EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK); + LRUAlgorithm algorithm = ea.createEvictionController(this.queueRegion, false); + algorithm.getLRUHelper().initStats(this.queueRegion, this.cache.getDistributedSystem()); + when(this.queueRegion.getEvictionController()).thenReturn(algorithm); + } + + private void createGatewaySender() { + // Mock gateway sender + this.sender = mock(AbstractGatewaySender.class); + when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender); + when(this.sender.getQueues()).thenReturn(null); + when(this.sender.getDispatcherThreads()).thenReturn(1); + when(this.sender.getCache()).thenReturn(this.cache); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + when(sender.getCancelCriterion()).thenReturn(cancelCriterion); + } + + private void createRootRegion() { + // Mock root region + this.rootRegion = mock(PartitionedRegion.class); + when(this.rootRegion.getFullPath()).thenReturn(Region.SEPARATOR+PartitionedRegionHelper.PR_ROOT_REGION_NAME); + when(this.cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(this.rootRegion); + when(this.cache.getRegion(getRegionQueueName(), false)).thenReturn(this.queueRegion); + } + + private void createBucketRegionQueue() { + // Create InternalRegionArguments + InternalRegionArguments ira = new InternalRegionArguments(); + ira.setPartitionedRegion(this.queueRegion); + ira.setPartitionedRegionBucketRedundancy(1); + BucketAdvisor ba = mock(BucketAdvisor.class); + ira.setBucketAdvisor(ba); + InternalRegionArguments pbrIra = new InternalRegionArguments(); + RegionAdvisor ra = mock(RegionAdvisor.class); + when(ra.getPartitionedRegion()).thenReturn(this.queueRegion); + pbrIra.setPartitionedRegionAdvisor(ra); + PartitionAttributes pa = mock(PartitionAttributes.class); + when(this.queueRegion.getPartitionAttributes()).thenReturn(pa); + when(this.queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION); + when(pa.getColocatedWith()).thenReturn(null); + ProxyBucketRegion pbr = new ProxyBucketRegion(BUCKET_ID, this.queueRegion, pbrIra); // final classes cannot be mocked + when(ba.getProxyBucketRegion()).thenReturn(pbr); + + // Create RegionAttributes + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setDataPolicy(DataPolicy.REPLICATE); + factory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK)); + RegionAttributes attributes = factory.create(); + + // Create BucketRegionQueue + this.bucketRegionQueue = new BucketRegionQueue(this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira); + this.bucketRegionQueueHelper = new BucketRegionQueueHelper(this.cache, this.queueRegion, this.bucketRegionQueue); + } + + @After + public void tearDownGemFire() { + GemFireCacheImpl.setInstanceForTests(null); + } + + @Test + public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue() throws Exception { + // Validate initial BucketRegionQueue state + assertFalse(this.bucketRegionQueue.isInitialized()); + assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); + + // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to add a key) + createAndProcessParallelQueueRemovalMessage(); + + // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage + assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); + } + + @Test + public void validateDestroyKeyFromBucketQueueInUninitializedBucketRegionQueue() throws Exception { + // Validate initial BucketRegionQueue state + assertEquals(0, this.bucketRegionQueue.size()); + assertFalse(this.bucketRegionQueue.isInitialized()); + + // Add an event to the BucketRegionQueue and verify BucketRegionQueue state + this.bucketRegionQueueHelper.addEvent(KEY); + assertEquals(1, this.bucketRegionQueue.size()); + + // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to DESTROYED) + when(this.queueRegion.getKeyInfo(KEY, null, null)).thenReturn(new KeyInfo(KEY, null, null)); + createAndProcessParallelQueueRemovalMessage(); + + // Clean up destroyed tokens and validate BucketRegionQueue + this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); + assertEquals(0, this.bucketRegionQueue.size()); + } + + @Test + public void validateDestroyFromTempQueueInUninitializedBucketRegionQueue() throws Exception { + // Validate initial BucketRegionQueue state + assertFalse(this.bucketRegionQueue.isInitialized()); + + // Create a real ConcurrentParallelGatewaySenderQueue + ParallelGatewaySenderEventProcessor pgsep = createConcurrentParallelGatewaySenderQueue(); + + // Add a mock GatewaySenderEventImpl to the temp queue + BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(pgsep, mock(GatewaySenderEventImpl.class)); + assertEquals(1, tempQueue.size()); + + // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to add a key) + createAndProcessParallelQueueRemovalMessage(); + + // Validate temp queue is empty after processing ParallelQueueRemovalMessage + assertEquals(0, tempQueue.size()); + } + + @Test + public void validateDestroyFromBucketQueueAndTempQueueInUninitializedBucketRegionQueue() { + // Validate initial BucketRegionQueue state + assertFalse(this.bucketRegionQueue.isInitialized()); + assertEquals(0, this.bucketRegionQueue.size()); + + // Create a real ConcurrentParallelGatewaySenderQueue + ParallelGatewaySenderEventProcessor pgsep = createConcurrentParallelGatewaySenderQueue(); + + // Add an event to the BucketRegionQueue and verify BucketRegionQueue state + GatewaySenderEventImpl gsei = this.bucketRegionQueueHelper.addEvent(KEY); + assertEquals(1, this.bucketRegionQueue.size()); + + // Add a mock GatewaySenderEventImpl to the temp queue + BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(pgsep, gsei); + assertEquals(1, tempQueue.size()); + + // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to DESTROYED) + when(this.queueRegion.getKeyInfo(KEY, null, null)).thenReturn(new KeyInfo(KEY, null, null)); + createAndProcessParallelQueueRemovalMessage(); + + // Validate temp queue is empty after processing ParallelQueueRemovalMessage + assertEquals(0, tempQueue.size()); + + // Clean up destroyed tokens + this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); + + // Validate BucketRegionQueue is empty after processing ParallelQueueRemovalMessage + assertEquals(0, this.bucketRegionQueue.size()); + } + + private void createAndProcessParallelQueueRemovalMessage() { + ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(createRegionToDispatchedKeysMap()); + pqrm.process(null); + } + + private HashMap<String, Map<Integer, List<Long>>> createRegionToDispatchedKeysMap() { + HashMap<String, Map<Integer, List<Long>>> regionToDispatchedKeys = new HashMap<>(); + Map<Integer, List<Long>> bucketIdToDispatchedKeys = new HashMap<>(); + List<Long> dispatchedKeys = new ArrayList<>(); + dispatchedKeys.add(KEY); + bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys); + regionToDispatchedKeys.put(getRegionQueueName(), bucketIdToDispatchedKeys); + return regionToDispatchedKeys; + } + + private ParallelGatewaySenderEventProcessor createConcurrentParallelGatewaySenderQueue() { + ParallelGatewaySenderEventProcessor pgsep = new ParallelGatewaySenderEventProcessor(sender); + ConcurrentParallelGatewaySenderQueue cpgsq = new ConcurrentParallelGatewaySenderQueue(sender, new ParallelGatewaySenderEventProcessor[] {pgsep}); + Set<RegionQueue> queues = new HashSet<>(); + queues.add(cpgsq); + when(this.sender.getQueues()).thenReturn(queues); + return pgsep; + } + + private BlockingQueue<GatewaySenderEventImpl> createTempQueueAndAddEvent(ParallelGatewaySenderEventProcessor pgsep, GatewaySenderEventImpl gsei) { + ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) pgsep.getQueue(); + Map<Integer, BlockingQueue<GatewaySenderEventImpl>> tempQueueMap = pgsq.getBucketToTempQueueMap(); + BlockingQueue<GatewaySenderEventImpl> tempQueue = new LinkedBlockingQueue(); + when(gsei.getShadowKey()).thenReturn(KEY); + tempQueue.add(gsei); + tempQueueMap.put(BUCKET_ID, tempQueue); + return tempQueue; + } + + private String getRegionQueueName() { + return Region.SEPARATOR+GATEWAY_SENDER_ID+ ParallelGatewaySenderQueue.QSTRING; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java index 93195c1..2ab64dd 100644 --- a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java +++ b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java @@ -26,7 +26,7 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.AbstractRegion; +import org.apache.geode.internal.cache.CachePerfStats; import org.apache.geode.internal.cache.GemFireCacheImpl; import java.io.File; @@ -74,9 +74,11 @@ public class Fakes { when(config.getDeployWorkingDir()).thenReturn(new File(".")); when(cache.getDistributedSystem()).thenReturn(system); + when(cache.getSystem()).thenReturn(system); when(cache.getMyId()).thenReturn(member); when(cache.getDistributionManager()).thenReturn(distributionManager); when(cache.getCancelCriterion()).thenReturn(systemCancelCriterion); + when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class)); when(system.getDistributedMember()).thenReturn(member); when(system.getConfig()).thenReturn(config); @@ -88,6 +90,7 @@ public class Fakes { when(system.createAtomicStatistics(any(), any())).thenReturn(stats); when(distributionManager.getId()).thenReturn(member); + when(distributionManager.getDistributionManagerId()).thenReturn(member); when(distributionManager.getConfig()).thenReturn(config); when(distributionManager.getSystem()).thenReturn(system); when(distributionManager.getCancelCriterion()).thenReturn(systemCancelCriterion);
