GEODE-3470: Increased serial gateway sender token timeout
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6a17c9b1 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6a17c9b1 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6a17c9b1 Branch: refs/heads/feature/GEODE-3416 Commit: 6a17c9b1b2f4c3148afc88829c40eaf355e4293a Parents: 7038eb9 Author: Barry Oglesby <[email protected]> Authored: Fri Aug 18 16:41:26 2017 -0700 Committer: Barry Oglesby <[email protected]> Committed: Mon Aug 21 14:03:01 2017 -0700 ---------------------------------------------------------------------- .../cache/wan/AbstractGatewaySender.java | 2 +- .../SerialGatewaySenderEventProcessor.java | 4 +- .../ParallelQueueRemovalMessageJUnitTest.java | 11 +- ...ialGatewaySenderEventProcessorJUnitTest.java | 114 +++++++++++++++++++ .../TestSerialGatewaySenderEventProcessor.java | 32 ++++++ 5 files changed, 151 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/6a17c9b1/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index c38d547..2154ffe 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -188,7 +188,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi Integer.getInteger("GatewaySender.QUEUE_SIZE_THRESHOLD", 5000).intValue(); public static int TOKEN_TIMEOUT = - Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 15000).intValue(); + Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 120000).intValue(); /** * The name of the DistributedLockService used when accessing the GatewaySender's meta data http://git-wip-us.apache.org/repos/asf/geode/blob/6a17c9b1/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 0aa0ed9..150b5ac 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -84,7 +84,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven * to keep track. Note: unprocessedEventsLock MUST be synchronized before using this map. This is * not a cut and paste error. sync unprocessedEventsLock when using unprocessedTokens. */ - private Map<EventID, Long> unprocessedTokens; + protected Map<EventID, Long> unprocessedTokens; private ExecutorService executor; @@ -98,7 +98,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven * When the Number of unchecked events exceeds this threshold and the number of tokens in the map * exceeds this threshold then a check will be done for old tokens. */ - static private final int REAP_THRESHOLD = 1000; + static protected final int REAP_THRESHOLD = 1000; /* * How many events have happened without a reap check being done? http://git-wip-us.apache.org/repos/asf/geode/blob/6a17c9b1/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java index e45a06b..1a49cfd 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; @@ -49,7 +48,6 @@ import org.apache.geode.internal.cache.BucketAdvisor; import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.BucketRegionQueueHelper; import org.apache.geode.internal.cache.EntryEventImpl; -import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.EvictionAttributesImpl; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalRegionArguments; @@ -156,10 +154,7 @@ public class ParallelQueueRemovalMessageJUnitTest { when(pa.getColocatedWith()).thenReturn(null); - // classes cannot be mocked - ProxyBucketRegion pbr = new ProxyBucketRegion(BUCKET_ID, this.queueRegion, pbrIra); - - when(ba.getProxyBucketRegion()).thenReturn(pbr); + when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class)); // Create RegionAttributes AttributesFactory factory = new AttributesFactory(); @@ -175,7 +170,7 @@ public class ParallelQueueRemovalMessageJUnitTest { this.bucketRegionQueue = spy(realBucketRegionQueue); // (this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira); EntryEventImpl entryEvent = EntryEventImpl.create(this.bucketRegionQueue, Operation.DESTROY, - mock(EventID.class), "value", null, false, mock(DistributedMember.class)); + KEY, "value", null, false, mock(DistributedMember.class)); doReturn(entryEvent).when(this.bucketRegionQueue).newDestroyEntryEvent(any(), any()); // when(this.bucketRegionQueue.newDestroyEntryEvent(any(), any())).thenReturn(); @@ -203,7 +198,6 @@ public class ParallelQueueRemovalMessageJUnitTest { assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); } - @Ignore @Test public void validateDestroyKeyFromBucketQueueInUninitializedBucketRegionQueue() throws Exception { // Validate initial BucketRegionQueue state @@ -245,7 +239,6 @@ public class ParallelQueueRemovalMessageJUnitTest { assertEquals(0, tempQueue.size()); } - @Ignore @Test public void validateDestroyFromBucketQueueAndTempQueueInUninitializedBucketRegionQueue() { // Validate initial BucketRegionQueue state http://git-wip-us.apache.org/repos/asf/geode/blob/6a17c9b1/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java new file mode 100644 index 0000000..f21634e --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.serial; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Operation; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.internal.cache.wan.GatewaySenderStats; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class SerialGatewaySenderEventProcessorJUnitTest { + + private AbstractGatewaySender sender; + + private TestSerialGatewaySenderEventProcessor processor; + + @Before + public void setUp() throws Exception { + this.sender = mock(AbstractGatewaySender.class); + this.processor = new TestSerialGatewaySenderEventProcessor(this.sender, "ny"); + } + + @Test + public void validateUnprocessedTokensMapUpdated() throws Exception { + GatewaySenderStats gss = mock(GatewaySenderStats.class); + when(sender.getStatistics()).thenReturn(gss); + + // Handle primary event + EventID id = handlePrimaryEvent(); + + // Verify the token was added by checking the correct stat methods were called and the size of + // the unprocessedTokensMap. + verify(gss).incUnprocessedTokensAddedByPrimary(); + verify(gss, never()).incUnprocessedEventsRemovedByPrimary(); + assertEquals(1, this.processor.getUnprocessedTokensSize()); + + // Handle the event from the secondary. The call to enqueueEvent is necessary to synchronize the + // unprocessedEventsLock and prevent the assertion error in basicHandleSecondaryEvent. + EntryEventImpl event = mock(EntryEventImpl.class); + when(event.getRegion()).thenReturn(mock(LocalRegion.class)); + when(event.getEventId()).thenReturn(id); + when(event.getOperation()).thenReturn(Operation.CREATE); + this.processor.enqueueEvent(null, event, null); + + // Verify the token was removed by checking the correct stat methods were called and the size of + // the unprocessedTokensMap. + verify(gss).incUnprocessedTokensRemovedBySecondary(); + verify(gss, never()).incUnprocessedEventsAddedBySecondary(); + assertEquals(0, this.processor.getUnprocessedTokensSize()); + } + + @Test + public void validateUnprocessedTokensMapReaping() throws Exception { + // Set the token timeout low + int originalTokenTimeout = AbstractGatewaySender.TOKEN_TIMEOUT; + AbstractGatewaySender.TOKEN_TIMEOUT = 500; + try { + GatewaySenderStats gss = mock(GatewaySenderStats.class); + when(sender.getStatistics()).thenReturn(gss); + + // Add REAP_THRESHOLD + 1 events to the unprocessed tokens map. This causes the uncheckedCount + // in the reaper to be REAP_THRESHOLD. The next event will cause the reaper to run.\ + int numEvents = SerialGatewaySenderEventProcessor.REAP_THRESHOLD + 1; + for (int i = 0; i < numEvents; i++) { + handlePrimaryEvent(); + } + assertEquals(numEvents, this.processor.getUnprocessedTokensSize()); + + // Wait for the timeout + Thread.sleep(AbstractGatewaySender.TOKEN_TIMEOUT + 1000); + + // Add one more event to the unprocessed tokens map. This will reap all of the previous + // tokens. + handlePrimaryEvent(); + assertEquals(1, this.processor.getUnprocessedTokensSize()); + } finally { + AbstractGatewaySender.TOKEN_TIMEOUT = originalTokenTimeout; + } + } + + private EventID handlePrimaryEvent() { + GatewaySenderEventImpl gsei = mock(GatewaySenderEventImpl.class); + EventID id = mock(EventID.class); + when(gsei.getEventId()).thenReturn(id); + this.processor.basicHandlePrimaryEvent(gsei); + return id; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/6a17c9b1/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java new file mode 100644 index 0000000..cf453e6 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.serial; + +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; + +public class TestSerialGatewaySenderEventProcessor extends SerialGatewaySenderEventProcessor { + + public TestSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id) { + super(sender, id); + } + + protected void initializeMessageQueue(String id) { + // Overridden to not create the RegionQueue in the constructor. + } + + protected int getUnprocessedTokensSize() { + return this.unprocessedTokens.size(); + } +}
