Repository: incubator-geode Updated Branches: refs/heads/develop 769d9b3ae -> 9189e3bb9
GEODE-1926 Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7b88c6cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7b88c6cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7b88c6cf Branch: refs/heads/develop Commit: 7b88c6cf7ad96675f81a138b2d03f8f3375168cf Parents: 769d9b3 Author: nabarun <[email protected]> Authored: Thu Oct 6 16:06:31 2016 -0700 Committer: nabarun <[email protected]> Committed: Fri Oct 7 10:00:21 2016 -0700 ---------------------------------------------------------------------- .../wan/serial/SerialGatewaySenderQueue.java | 56 ++++++---- .../serial/SerialWANConflationDUnitTest.java | 111 +++++++++++++++++++ 2 files changed, 148 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b88c6cf/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index a8bb72d..28f5f83 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -513,13 +513,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { //resetLastPeeked(); while (batch.size() < size) { AsyncEvent object = peekAhead(); - if (object != null && object instanceof GatewaySenderEventImpl) { - GatewaySenderEventImpl copy = ((GatewaySenderEventImpl)object).makeHeapCopyIfOffHeap(); - if (copy == null) { - continue; - } - object = copy; - } // Conflate here if (object != null) { batch.add(object); @@ -759,17 +752,42 @@ public class SerialGatewaySenderQueue implements RegionQueue { * * @throws CacheException */ - private AsyncEvent peekAhead() throws CacheException { - AsyncEvent object = null; - long currentKey = -1; + + private Long getCurrentKey(){ + long currentKey; if (this.peekedIds.isEmpty()) { - currentKey = getHeadKey(); + currentKey = getHeadKey(); } else { - Long lastPeek = this.peekedIds.peekLast(); - if (lastPeek == null) { - return null; - } - currentKey = lastPeek.longValue() + 1; + Long lastPeek = this.peekedIds.peekLast(); + if (lastPeek == null) { + return null; + } + currentKey = lastPeek.longValue() + 1; + } + return currentKey; + } + + private AsyncEvent getObjectInSerialSenderQueue(Long currentKey) { + AsyncEvent object = optimalGet(currentKey); + if ((null != object) && logger.isDebugEnabled()) { + logger.debug("{}: Peeked {}->{}", this, currentKey, object); + } + if (object != null && object instanceof GatewaySenderEventImpl) { + GatewaySenderEventImpl copy = ((GatewaySenderEventImpl)object).makeHeapCopyIfOffHeap(); + if (copy == null) { + logger.debug("Unable to make heap copy and will not be added to peekedIds for object" + + " : {} ",object.toString()); + } + object = copy; + } + return object; + } + + private AsyncEvent peekAhead() throws CacheException { + AsyncEvent object = null; + Long currentKey = getCurrentKey(); + if(currentKey == null ){ + return null; } @@ -788,12 +806,12 @@ public class SerialGatewaySenderQueue implements RegionQueue { // does not save anything since GatewayBatchOp needs to GatewayEventImpl // in object form. while (before(currentKey, getTailKey()) - // use optimalGet here to fix bug 40654 - && (object = optimalGet(Long.valueOf(currentKey))) == null) { + && (null == (object = getObjectInSerialSenderQueue(currentKey)))) { if (logger.isTraceEnabled()) { logger.trace("{}: Trying head key + offset: {}", this, currentKey); } currentKey = inc(currentKey); + object = getObjectInSerialSenderQueue(currentKey); if (this.stats != null) { this.stats.incEventsNotQueuedConflated(); } @@ -803,7 +821,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { logger.debug("{}: Peeked {}->{}", this, currentKey, object); } if (object != null) { - this.peekedIds.add(Long.valueOf(currentKey)); + this.peekedIds.add(currentKey); } return object; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b88c6cf/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java new file mode 100644 index 0000000..82d1067 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.cache.wan.serial; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.jayway.awaitility.Awaitility; +import org.junit.Test; + +import org.apache.geode.internal.cache.wan.WANTestBase; + +public class SerialWANConflationDUnitTest extends WANTestBase{ + + @Test + public void testSerialPropagationPartitionRegionBatchConflation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() ->createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() ->createPartitionedRegion( + getTestMethodName(), null, 1, 8, isOffHeap() )); + vm3.invoke(() ->createPartitionedRegion( + getTestMethodName(), null, 1, 8, isOffHeap() )); + + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", 0, 8, isOffHeap() )); + vm5.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", 0, 8, isOffHeap() )); + vm6.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", 0, 8, isOffHeap() )); + vm7.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", 0, 8, isOffHeap() )); + + vm4.invoke(() ->createSender( "ln", 2, + false, 100, 50, false, false, null, true )); + vm5.invoke(() ->createSender( "ln", 2, + false, 100, 50, false, false, null, true )); + vm6.invoke(() ->createSender( "ln", 2, + false, 100, 50, false, false, null, true )); + vm7.invoke(() ->createSender( "ln", 2, + false, 100, 50, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() ->pauseSender( "ln" )); + vm5.invoke(() ->pauseSender( "ln" )); + vm6.invoke(() ->pauseSender( "ln" )); + vm7.invoke(() ->pauseSender( "ln" )); + + + final Map keyValues = new HashMap(); + + for (int i = 1; i <= 10; i++) { + for (int j = 1; j <= 10; j++) { + keyValues.put(j, i) ; + } + vm4.invoke(() ->putGivenKeyValue( + getTestMethodName(), keyValues )); + } + + vm4.invoke(() ->enableConflation( "ln" )); + vm5.invoke(() ->enableConflation( "ln" )); + vm6.invoke(() ->enableConflation( "ln" )); + vm7.invoke(() ->enableConflation( "ln" )); + + vm4.invoke(() ->resumeSender( "ln" )); + vm5.invoke(() ->resumeSender( "ln" )); + vm6.invoke(() ->resumeSender( "ln" )); + vm7.invoke(() ->resumeSender( "ln" )); + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + + assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); + + vm2.invoke(() ->validateRegionSize( + getTestMethodName(), 10 )); + + } + +}
