Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-1466 d26b03cf8 -> d7121b5a9 (forced update)
GEODE-1962: Increment notQueuedConflated stat if object unresolved from offheap Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/280d2d8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/280d2d8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/280d2d8f Branch: refs/heads/feature/GEODE-1466 Commit: 280d2d8f70ed2b0ae533b2c2d0728130133927ce Parents: b2d85b4 Author: Jason Huynh <huyn...@gmail.com> Authored: Wed Oct 5 13:32:19 2016 -0700 Committer: Jason Huynh <huyn...@gmail.com> Committed: Tue Oct 11 11:08:28 2016 -0700 ---------------------------------------------------------------------- .../parallel/ParallelGatewaySenderQueue.java | 35 +++-- .../ParallelGatewaySenderQueueJUnitTest.java | 134 ++++++++++++++++++- 2 files changed, 149 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/280d2d8f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 1a9b126..04b6c91 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -993,7 +993,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return null; } - private boolean areLocalBucketQueueRegionsPresent() { + protected boolean areLocalBucketQueueRegionsPresent() { boolean bucketsAvailable = false; for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { if (prQ.getDataStore().getAllLocalBucketRegions().size() > 0) @@ -1002,10 +1002,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return false; } - private boolean areLocalBucketQueueRegionsPresent(PartitionedRegion prQ) { - return prQ.getDataStore().isLocalBucketRegionPresent(); - } - private int pickBucketId; protected int getRandomPrimaryBucket(PartitionedRegion prQ) { @@ -1032,8 +1028,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { while(nTry-- > 0) { if(pickBucketId >= thisProcessorBuckets.size()) pickBucketId = 0; - BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore() - .getLocalBucketById(thisProcessorBuckets.get(pickBucketId++)); + BucketRegionQueue br = getBucketRegionQueueByBucketId(prQ, thisProcessorBuckets.get(pickBucketId++)); if (br != null && br.isReadyForPeek()) { return br.getId(); } @@ -1044,7 +1039,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { /*Collections.shuffle(thisProcessorBuckets); for (Integer bucketId : thisProcessorBuckets) { BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore() - .getLocalBucketById(bucketId); + .getBucketRegionQueueByBucketId(bucketId); if (br != null && br.isReadyForPeek()) { return br.getId(); @@ -1119,8 +1114,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId) .isPrimary(); if (isPrimary) { - BucketRegionQueue brq = (BucketRegionQueue)prQ.getDataStore() - .getLocalBucketById(bucketId); + BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); // TODO : Kishor : Make sure we dont need to initalize a bucket // before destroying a key from it try { @@ -1303,6 +1297,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (object != null) { GatewaySenderEventImpl copy = object.makeHeapCopyIfOffHeap(); if (copy == null) { + if (stats != null) { + stats.incEventsNotQueuedConflated(); + } continue; } object = copy; @@ -1314,11 +1311,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } batch.add(object); peekedEvents.add(object); - BucketRegionQueue brq = ((BucketRegionQueue)prQ - .getDataStore().getLocalBucketById(bId)); - - //brq.doLockForPrimary(false); - + } else { // If time to wait is -1 (don't wait) or time interval has elapsed long currentTime = System.currentTimeMillis(); @@ -1452,8 +1445,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { protected Object peekAhead(PartitionedRegion prQ, int bucketId) throws CacheException { Object object = null; - BucketRegionQueue brq = ((BucketRegionQueue)prQ - .getDataStore().getLocalBucketById(bucketId)); + BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); if (logger.isDebugEnabled()) { logger.debug("{}: Peekahead for the bucket {}",this, bucketId); @@ -1475,8 +1467,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } return object; // OFFHEAP: ok since callers are careful to do destroys on region queue after finished with peeked object. } - - + + protected BucketRegionQueue getBucketRegionQueueByBucketId(final PartitionedRegion prQ, final int bucketId) { + return (BucketRegionQueue)prQ + .getDataStore().getLocalBucketById(bucketId); + } + + public int localSize() { int size = 0; for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/280d2d8f/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java index f1d4408..4beace5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java @@ -19,20 +19,35 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.AbstractBucketRegionQueue; +import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; +import org.apache.geode.internal.cache.execute.BucketMovedException; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.internal.cache.wan.GatewaySenderStats; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue.MetaRegionFactory; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue.ParallelGatewaySenderQueueMetaRegion; import org.apache.geode.test.junit.categories.UnitTest; @@ -43,11 +58,12 @@ public class ParallelGatewaySenderQueueJUnitTest { private ParallelGatewaySenderQueue queue; private MetaRegionFactory metaRegionFactory; private GemFireCacheImpl cache; + private AbstractGatewaySender sender; @Before public void createParallelGatewaySenderQueue() { cache = mock(GemFireCacheImpl.class); - AbstractGatewaySender sender = mock(AbstractGatewaySender.class); + sender = mock(AbstractGatewaySender.class); CancelCriterion cancelCriterion = mock(CancelCriterion.class); when(sender.getCancelCriterion()).thenReturn(cancelCriterion); when(sender.getCache()).thenReturn(cache); @@ -58,6 +74,54 @@ public class ParallelGatewaySenderQueueJUnitTest { } @Test + public void whenGatewayEventUnableToResolveFromOffHeapTheStatForNotQueuedConflatedShouldBeIncremented() throws Exception { + GatewaySenderStats stats = mockGatewaySenderStats(); + + GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class); + when(event.makeHeapCopyIfOffHeap()).thenReturn(null); + GatewaySenderEventImpl eventResolvesFromOffHeap = mock(GatewaySenderEventImpl.class); + when(eventResolvesFromOffHeap.makeHeapCopyIfOffHeap()).thenReturn(eventResolvesFromOffHeap); + Queue backingList = new LinkedList(); + backingList.add(event); + backingList.add(eventResolvesFromOffHeap); + + BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList); + + TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 1, metaRegionFactory); + queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue); + + List peeked = queue.peek(1, 1000); + assertEquals(1, peeked.size()); + verify(stats, times(1)).incEventsNotQueuedConflated(); + } + + private GatewaySenderStats mockGatewaySenderStats() { + GatewaySenderStats stats = mock(GatewaySenderStats.class); + when(sender.getStatistics()).thenReturn(stats); + return stats; + } + + @Test + public void whenNullPeekedEventFromBucketRegionQueueTheStatForNotQueuedConflatedShouldBeIncremented() throws Exception { + GatewaySenderStats stats = mockGatewaySenderStats(); + + GatewaySenderEventImpl eventResolvesFromOffHeap = mock(GatewaySenderEventImpl.class); + when(eventResolvesFromOffHeap.makeHeapCopyIfOffHeap()).thenReturn(eventResolvesFromOffHeap); + Queue backingList = new LinkedList(); + backingList.add(null); + backingList.add(eventResolvesFromOffHeap); + + BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList); + + TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 1, metaRegionFactory); + queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue); + + List peeked = queue.peek(1, 1000); + assertEquals(1, peeked.size()); + verify(stats, times(1)).incEventsNotQueuedConflated(); + } + + @Test public void testLocalSize() throws Exception { ParallelGatewaySenderQueueMetaRegion mockMetaRegion = mock(ParallelGatewaySenderQueueMetaRegion.class); PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class); @@ -79,5 +143,73 @@ public class ParallelGatewaySenderQueueJUnitTest { when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION); return region; } + + private BucketRegionQueue mockBucketRegionQueue(final Queue backingList) { + PartitionedRegion mockBucketRegion = mockPR("bucketRegion"); + //These next mocked return calls are for when peek is called. It ends up checking these on the mocked pr region + when(mockBucketRegion.getLocalMaxMemory()).thenReturn(100); + when(mockBucketRegion.size()).thenReturn(backingList.size()); + + BucketRegionQueue bucketRegionQueue = mock(BucketRegionQueue.class); + when (bucketRegionQueue.getPartitionedRegion()).thenReturn(mockBucketRegion); + when(bucketRegionQueue.peek()).thenAnswer((Answer) invocation -> backingList.poll()); + return bucketRegionQueue; + } + + + + private class TestableParallelGatewaySenderQueue extends ParallelGatewaySenderQueue { + + private BucketRegionQueue mockedAbstractBucketRegionQueue; + + public TestableParallelGatewaySenderQueue(final AbstractGatewaySender sender, + final Set<Region> userRegions, + final int idx, + final int nDispatcher) { + super(sender, userRegions, idx, nDispatcher); + } + + public TestableParallelGatewaySenderQueue(final AbstractGatewaySender sender, + final Set<Region> userRegions, + final int idx, + final int nDispatcher, + final MetaRegionFactory metaRegionFactory) { + super(sender, userRegions, idx, nDispatcher, metaRegionFactory); + } + + + public void setMockedAbstractBucketRegionQueue(BucketRegionQueue mocked) { + this.mockedAbstractBucketRegionQueue = mocked; + } + + public AbstractBucketRegionQueue getBucketRegion(final PartitionedRegion prQ, final int bucketId) { + return mockedAbstractBucketRegionQueue; + } + + @Override + public boolean areLocalBucketQueueRegionsPresent() { + return true; + } + + @Override + protected PartitionedRegion getRandomShadowPR() { + return mockedAbstractBucketRegionQueue.getPartitionedRegion(); + } + + @Override + protected int getRandomPrimaryBucket(PartitionedRegion pr) { + return 0; + } + + @Override + protected BucketRegionQueue getBucketRegionQueueByBucketId(PartitionedRegion prQ, int bucketId) { + return mockedAbstractBucketRegionQueue; + } + +// @Override +// public int localSizeForProcessor() { +// return 1; +// } + } }