This is an automated email from the ASF dual-hosted git repository. mhanson pushed a commit to branch release/1.11.0 in repository https://gitbox.apache.org/repos/asf/geode.git
commit e87b763afe5bcd12a61c978314410d0e4eadd6d4 Author: Eric Shu <[email protected]> AuthorDate: Tue Dec 17 12:57:19 2019 -0800 GEODE-7537: use dm.getCache instead of CacheFactory.getAnyInstance (#4486) (cherry picked from commit 53f851c94b6ee2e62e8e16f3cd73e77cec54eb19) --- ...aySenderQueueEntrySynchronizationOperation.java | 21 +++---- .../wan/parallel/ParallelGatewaySenderQueue.java | 12 ++-- ...nderQueueEntrySynchronizationOperationTest.java | 71 ++++++++++++++++++++++ .../ParallelGatewaySenderQueueJUnitTest.java | 28 +++++++++ 4 files changed, 114 insertions(+), 18 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java index 08d1e5b..c13b3c5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.internal.ClusterDistributionManager; @@ -41,6 +40,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegion; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.versions.VersionTag; @@ -95,7 +95,7 @@ public class GatewaySenderQueueEntrySynchronizationOperation { } protected GemFireCacheImpl getCache() { - return (GemFireCacheImpl) CacheFactory.getAnyInstance(); + return (GemFireCacheImpl) region.getDistributionManager().getCache(); } private void initializeEntriesToSynchronize( @@ -163,8 +163,8 @@ public class GatewaySenderQueueEntrySynchronizationOperation { } } - private Cache getCache() { - return CacheFactory.getAnyInstance(); + Cache getCache() { + return dmgr.getCache(); } } @@ -198,7 +198,7 @@ public class GatewaySenderQueueEntrySynchronizationOperation { logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}", getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize); } - result = getSynchronizationEvents(); + result = getSynchronizationEvents(dm.getCache()); } catch (Throwable t) { replyException = new ReplyException(t); } finally { @@ -218,15 +218,14 @@ public class GatewaySenderQueueEntrySynchronizationOperation { } } - private Object getSynchronizationEvents() { + private Object getSynchronizationEvents(InternalCache cache) { List<Map<String, GatewayQueueEvent>> results = new ArrayList<>(); // Get the region - GemFireCacheImpl gfci = (GemFireCacheImpl) getCache(); - LocalRegion region = (LocalRegion) gfci.getRegion(this.regionPath); + LocalRegion region = (LocalRegion) cache.getRegion(this.regionPath); // Add the appropriate GatewaySenderEventImpl from each GatewaySender for each entry Set<String> allGatewaySenderIds = region.getAllGatewaySenderIds(); - for (GatewaySender sender : gfci.getAllGatewaySenders()) { + for (GatewaySender sender : cache.getAllGatewaySenders()) { if (allGatewaySenderIds.contains(sender.getId())) { for (GatewaySenderQueueEntrySynchronizationEntry entry : this.entriesToSynchronize) { Map<String, GatewayQueueEvent> resultForOneEntry = new HashMap<>(); @@ -243,10 +242,6 @@ public class GatewaySenderQueueEntrySynchronizationOperation { return results; } - private Cache getCache() { - return CacheFactory.getAnyInstance(); - } - @Override public int getDSFID() { return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 35f3b20..1b26b60 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -659,9 +659,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { boolean putDone = false; // Can this region ever be null? Should we work with regionName and not with region // instance. - // It can't be as put is happeing on the region and its still under process + // It can't be as put is happening on the region and its still under process GatewaySenderEventImpl value = (GatewaySenderEventImpl) object; - boolean isDREvent = isDREvent(value); + + boolean isDREvent = isDREvent(sender.getCache(), value); Region region = value.getRegion(); String regionPath = null; @@ -922,8 +923,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return prQ; } - private boolean isDREvent(GatewaySenderEventImpl event) { - return (event.getRegion() instanceof DistributedRegion) ? true : false; + boolean isDREvent(InternalCache cache, GatewaySenderEventImpl event) { + Region region = cache.getRegion(event.getRegionPath()); + return region instanceof DistributedRegion; } /** @@ -1001,7 +1003,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { int bucketId = -1; Object key = null; if (event.getRegion() != null) { - if (isDREvent(event)) { + if (isDREvent(sender.getCache(), event)) { prQ = this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath()); bucketId = event.getEventId().getBucketID(); key = event.getEventId(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java new file mode 100644 index 0000000..e9f0466 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InternalRegion; + +public class GatewaySenderQueueEntrySynchronizationOperationTest { + private DistributionManager distributionManager; + private InternalDistributedMember recipient; + private GatewaySenderQueueEntrySynchronizationOperation operation; + private InternalRegion region; + private GemFireCacheImpl cache; + + @Before + public void setup() { + distributionManager = mock(DistributionManager.class, RETURNS_DEEP_STUBS); + recipient = mock(InternalDistributedMember.class); + region = mock(InternalRegion.class); + cache = mock(GemFireCacheImpl.class); + } + + @Test + public void ReplyProcessorGetCacheDelegateToDistributionManager() { + operation = mock(GatewaySenderQueueEntrySynchronizationOperation.class); + GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationReplyProcessor processor = + new GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationReplyProcessor( + distributionManager, recipient, operation); + when(distributionManager.getCache()).thenReturn(cache); + + assertThat(processor.getCache()).isEqualTo(cache); + } + + @Test + public void GatewaySenderQueueEntrySynchronizationOperationGetCacheDelegateToDistributionManager() { + InitialImageOperation.Entry entry = mock(InitialImageOperation.Entry.class); + List<InitialImageOperation.Entry> list = new ArrayList<>(); + list.add(entry); + operation = new GatewaySenderQueueEntrySynchronizationOperation(recipient, region, list); + when(region.getDistributionManager()).thenReturn(distributionManager); + when(distributionManager.getCache()).thenReturn(cache); + + assertThat(operation.getCache()).isEqualTo(cache); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java index d03a15c..88c3e45 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache.wan.parallel; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -39,6 +40,7 @@ import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.internal.cache.AbstractBucketRegionQueue; import org.apache.geode.internal.cache.BucketRegionQueue; +import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; @@ -155,6 +157,32 @@ public class ParallelGatewaySenderQueueJUnitTest { assertEquals(3, queue.localSize()); } + @Test + public void isDREventReturnsTrueForDistributedRegionEvent() { + String regionPath = "regionPath"; + GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class); + when(event.getRegionPath()).thenReturn(regionPath); + DistributedRegion region = mock(DistributedRegion.class); + when(cache.getRegion(regionPath)).thenReturn(region); + ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class); + when(queue.isDREvent(cache, event)).thenCallRealMethod(); + + assertThat(queue.isDREvent(cache, event)).isTrue(); + } + + @Test + public void isDREventReturnsFalseForPartitionedRegionEvent() { + String regionPath = "regionPath"; + GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class); + when(event.getRegionPath()).thenReturn(regionPath); + PartitionedRegion region = mock(PartitionedRegion.class); + when(cache.getRegion(regionPath)).thenReturn(region); + ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class); + when(queue.isDREvent(cache, event)).thenCallRealMethod(); + + assertThat(queue.isDREvent(cache, event)).isFalse(); + } + private PartitionedRegion mockPR(String name) { PartitionedRegion region = mock(PartitionedRegion.class); when(region.getFullPath()).thenReturn(name);
