This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch release/1.11.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.11.0 by this push:
new d38ad7b GEODE-7465: Set eventProcessor to null in serial AEQ when it
is stopped
d38ad7b is described below
commit d38ad7b0c85ede6a181e814e97a1693133433766
Author: Barry Oglesby <[email protected]>
AuthorDate: Thu Nov 21 14:34:38 2019 -1000
GEODE-7465: Set eventProcessor to null in serial AEQ when it is stopped
(cherry picked from commit e148cef9cb63eba283cf86bc490eb280023567ce)
---
.../AsyncEventListenerDistributedTest.java | 25 ++
...AsyncEventListenerStopStartDistributedTest.java | 307 +++++++++++++++++++++
.../SerialAsyncEventQueueImplJUnitTest.java | 27 ++
.../internal/SerialAsyncEventQueueImpl.java | 32 ++-
.../internal/cache/wan/AbstractGatewaySender.java | 4 +-
.../wan/AbstractGatewaySenderEventProcessor.java | 6 +-
...currentParallelGatewaySenderEventProcessor.java | 8 +-
...oncurrentSerialGatewaySenderEventProcessor.java | 8 +-
.../cache/wan/serial/SerialGatewaySenderQueue.java | 2 +-
.../internal/SerialAsyncEventQueueImplTest.java | 118 ++++++++
10 files changed, 514 insertions(+), 23 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
index 09cab0e..bb79369 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
@@ -161,6 +161,31 @@ public class AsyncEventListenerDistributedTest implements
Serializable {
}
@Test // serial, ReplicateRegion
+ public void testSerialAsyncEventQueueStopStart() {
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+
+ vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new
SpyAsyncEventListener(), false,
+ 100, dispatcherThreadCount, 100));
+ vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new
SpyAsyncEventListener(), false,
+ 100, dispatcherThreadCount, 100));
+
+ vm0.invoke(() -> createReplicateRegion(replicateRegionName,
asyncEventQueueId));
+ vm1.invoke(() -> createReplicateRegion(replicateRegionName,
asyncEventQueueId));
+
+ vm0.invoke(() -> getInternalGatewaySender().stop());
+ vm1.invoke(() -> getInternalGatewaySender().stop());
+
+ vm0.invoke(() -> doPuts(replicateRegionName, 10));
+
+ vm0.invoke(() -> getInternalGatewaySender().start());
+ vm1.invoke(() -> getInternalGatewaySender().start());
+
+ assertThat(vm0.invoke(() -> getAsyncEventQueue().size())).isEqualTo(0);
+ assertThat(vm1.invoke(() -> getAsyncEventQueue().size())).isEqualTo(0);
+ }
+
+ @Test // serial, ReplicateRegion
public void testReplicatedSerialAsyncEventQueue() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/ParallelSerialAsyncEventListenerStopStartDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/ParallelSerialAsyncEventListenerStopStartDistributedTest.java
new file mode 100644
index 0000000..29b26d0
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/ParallelSerialAsyncEventListenerStopStartDistributedTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
+import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.AEQTest;
+import
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+/**
+ * Extracted from {@link AsyncEventListenerDistributedTest}.
+ */
+@Category(AEQTest.class)
+@SuppressWarnings("serial")
+public class ParallelSerialAsyncEventListenerStopStartDistributedTest
implements Serializable {
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new
SerializableTemporaryFolder();
+
+ private String basePartitionedRegionName;
+ private String baseReplicateRegionName;
+ private String baseSerialAsyncEventQueueId;
+ private String baseParallelAsyncEventQueueId;
+
+ private VM vm0;
+ private VM vm1;
+ private VM vm2;
+ private VM vm3;
+
+ @Before
+ public void setUp() throws Exception {
+ vm0 = getVM(0);
+ vm1 = getVM(1);
+ vm2 = getVM(2);
+ vm3 = getVM(3);
+
+ String className = getClass().getSimpleName();
+ basePartitionedRegionName = className + "_PR_";
+ baseReplicateRegionName = className + "_RR_";
+ baseSerialAsyncEventQueueId = className + "_serialAEQ_";
+ baseParallelAsyncEventQueueId = className + "_parallelAEQ_";
+ }
+
+ /**
+ * Override as needed to add to the configuration, such as
off-heap-memory-size.
+ */
+ protected Properties getDistributedSystemProperties() {
+ return new Properties();
+ }
+
+ /**
+ * Override as needed to add to the configuration, such as
regionFactory.setOffHeap(boolean).
+ */
+ protected RegionFactory<?, ?> configureRegion(RegionFactory<?, ?>
regionFactory) {
+ return regionFactory;
+ }
+
+ @Test
+ public void testStopStartPersistentParallelAndSerialAsyncEventQueues() {
+ // Create cache
+ vm0.invoke(() -> createCache());
+ vm1.invoke(() -> createCache());
+ vm2.invoke(() -> createCache());
+ vm3.invoke(() -> createCache());
+
+ // Create several serial and parallel AEQs
+ int numAEQs = 3;
+ vm0.invoke(() -> createAsyncEventQueues(numAEQs));
+ vm1.invoke(() -> createAsyncEventQueues(numAEQs));
+ vm2.invoke(() -> createAsyncEventQueues(numAEQs));
+ vm3.invoke(() -> createAsyncEventQueues(numAEQs));
+
+ // Create replicated and partitioned regions attached to those AEQs
+ int numRegions = 3;
+ vm0.invoke(() -> createRegions(numRegions));
+ vm1.invoke(() -> createRegions(numRegions));
+ vm2.invoke(() -> createRegions(numRegions));
+ vm3.invoke(() -> createRegions(numRegions));
+
+ // Do puts into all replicated and partitioned regions
+ vm0.invoke(() -> doPuts(numRegions, 1000));
+
+ // Wait for all AEQs to be empty
+ vm0.invoke(() -> waitForAsyncQueuesToEmpty());
+ vm1.invoke(() -> waitForAsyncQueuesToEmpty());
+ vm2.invoke(() -> waitForAsyncQueuesToEmpty());
+ vm3.invoke(() -> waitForAsyncQueuesToEmpty());
+
+ // Stop all AEQs
+ vm0.invoke(() -> stopAsyncQueues());
+ vm1.invoke(() -> stopAsyncQueues());
+ vm2.invoke(() -> stopAsyncQueues());
+ vm3.invoke(() -> stopAsyncQueues());
+
+ // Start all AEQs
+ AsyncInvocation startAeqsVm0 = vm0.invokeAsync(() -> startAsyncQueues());
+ AsyncInvocation startAeqsVm1 = vm1.invokeAsync(() -> startAsyncQueues());
+ AsyncInvocation startAeqsVm2 = vm2.invokeAsync(() -> startAsyncQueues());
+ AsyncInvocation startAeqsVm3 = vm3.invokeAsync(() -> startAsyncQueues());
+
+ // Wait for async tasks to complete
+ ThreadUtils.join(startAeqsVm0, 120 * 1000);
+ ThreadUtils.join(startAeqsVm1, 120 * 1000);
+ ThreadUtils.join(startAeqsVm2, 120 * 1000);
+ ThreadUtils.join(startAeqsVm3, 120 * 1000);
+ }
+
+ private void createAsyncEventQueues(int numAEQs) {
+ for (int i = 0; i < numAEQs; i++) {
+ createPersistentAsyncEventQueue(baseSerialAsyncEventQueueId + i,
+ createDiskStoreName(baseSerialAsyncEventQueueId + i), false);
+ createPersistentAsyncEventQueue(baseParallelAsyncEventQueueId + i,
+ createDiskStoreName(baseParallelAsyncEventQueueId + i), true);
+ }
+ }
+
+ private void createRegions(int numRegions) {
+ for (int i = 0; i < numRegions; i++) {
+ createReplicateRegion(baseReplicateRegionName + i,
baseSerialAsyncEventQueueId + i);
+ createPartitionedRegion(basePartitionedRegionName + i,
baseParallelAsyncEventQueueId + i);
+ }
+ }
+
+ private void waitForAsyncQueuesToEmpty() {
+ for (final AsyncEventQueue aeq : getAsyncEventQueues()) {
+ await().until(() -> aeq.size() == 0);
+ }
+ }
+
+ private void stopAsyncQueues() {
+ for (final AsyncEventQueue aeq : getAsyncEventQueues()) {
+ InternalAsyncEventQueue iaeq = (InternalAsyncEventQueue) aeq;
+ assertThat(iaeq.getSender().isRunning()).isTrue();
+ iaeq.stop();
+ assertThat(iaeq.getSender().isRunning()).isFalse();
+ }
+ }
+
+ private void startAsyncQueues() {
+ for (final AsyncEventQueue aeq : getAsyncEventQueues()) {
+ GatewaySender sender = ((InternalAsyncEventQueue) aeq).getSender();
+ assertThat(sender.isRunning()).isFalse();
+ sender.start();
+ assertThat(sender.isRunning()).isTrue();
+ }
+ }
+
+ private Collection<AsyncEventQueue> getAsyncEventQueues() {
+ return getCache().getAsyncEventQueues().stream()
+
.sorted(Comparator.comparing(AsyncEventQueue::getId)).collect(Collectors.toList());
+ }
+
+ private InternalCache getCache() {
+ return cacheRule.getOrCreateCache(getDistributedSystemProperties());
+ }
+
+ private void createCache() {
+ cacheRule.createCache(getDistributedSystemProperties());
+ }
+
+ private void createPartitionedRegion(String regionName, String
asyncEventQueueId) {
+ assertThat(regionName).isNotEmpty();
+ assertThat(asyncEventQueueId).isNotEmpty();
+
+ PartitionAttributesFactory<?, ?> partitionAttributesFactory = new
PartitionAttributesFactory();
+
+ RegionFactory<?, ?> regionFactory =
getCache().createRegionFactory(PARTITION_REDUNDANT);
+ regionFactory.addAsyncEventQueueId(asyncEventQueueId);
+ regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+ configureRegion(regionFactory).create(regionName);
+ }
+
+ private void createReplicateRegion(String regionName, String
asyncEventQueueId) {
+ assertThat(regionName).isNotEmpty();
+ assertThat(asyncEventQueueId).isNotEmpty();
+
+ RegionFactory<?, ?> regionFactory =
getCache().createRegionFactory(REPLICATE);
+ regionFactory.addAsyncEventQueueId(asyncEventQueueId);
+
+ configureRegion(regionFactory).create(regionName);
+ }
+
+ private void createDiskStore(String diskStoreName, String asyncEventQueueId)
{
+ assertThat(diskStoreName).isNotEmpty();
+ assertThat(asyncEventQueueId).isNotEmpty();
+
+ File directory = createDirectory(createDiskStoreName(asyncEventQueueId));
+
+ DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
+ diskStoreFactory.setDiskDirs(new File[] {directory});
+
+ diskStoreFactory.create(diskStoreName);
+ }
+
+ private File createDirectory(String name) {
+ assertThat(name).isNotEmpty();
+
+ File directory = new File(temporaryFolder.getRoot(), name);
+ if (!directory.exists()) {
+ try {
+ return temporaryFolder.newFolder(name);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return directory;
+ }
+
+ private String createDiskStoreName(String asyncEventQueueId) {
+ assertThat(asyncEventQueueId).isNotEmpty();
+
+ return asyncEventQueueId + "_disk_" + getCurrentVMNum();
+ }
+
+ private void createPersistentAsyncEventQueue(String asyncEventQueueId,
String diskStoreName,
+ boolean isParallel) {
+ createDiskStore(diskStoreName, asyncEventQueueId);
+
+ AsyncEventQueueFactory asyncEventQueueFactory =
getCache().createAsyncEventQueueFactory();
+ asyncEventQueueFactory.setDiskStoreName(diskStoreName);
+ asyncEventQueueFactory.setParallel(isParallel);
+ asyncEventQueueFactory.setPersistent(true);
+
+ asyncEventQueueFactory.create(asyncEventQueueId, new
TestAsyncEventListener());
+ }
+
+ private void doPuts(int numRegions, int numPuts) {
+ for (int i = 0; i < numRegions; i++) {
+ Region<Integer, Integer> rr =
getCache().getRegion(baseReplicateRegionName + i);
+ Region<Integer, Integer> pr =
getCache().getRegion(basePartitionedRegionName + i);
+ for (int j = 0; j < numPuts; j++) {
+ rr.put(j, j);
+ pr.put(j, j);
+ }
+ }
+ }
+
+ private static class TestAsyncEventListener implements AsyncEventListener {
+
+ @Override
+ public synchronized boolean processEvents(List<AsyncEvent> events) {
+ return true;
+ }
+ }
+}
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 0c23b9f..9138c8c 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -68,4 +68,31 @@ public class SerialAsyncEventQueueImplJUnitTest {
assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
}
+ @Test
+ public void testStopStart() {
+ GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+ attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
+ SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
+ cache.getInternalDistributedSystem().getStatisticsManager(),
cache.getStatisticsClock(),
+ attrs);
+ queue.getStatistics().incQueueSize(5);
+ queue.getStatistics().incSecondaryQueueSize(6);
+ queue.getStatistics().incTempQueueSize(10);
+ queue.getStatistics().incEventsProcessedByPQRM(3);
+
+ assertEquals(5, queue.getStatistics().getEventQueueSize());
+ assertEquals(6, queue.getStatistics().getSecondaryEventQueueSize());
+ assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+ assertEquals(3, queue.getStatistics().getEventsProcessedByPQRM());
+
+ queue.start();
+ queue.stop();
+ queue.start();
+
+ assertEquals(0, queue.getStatistics().getEventQueueSize());
+ assertEquals(0, queue.getStatistics().getSecondaryEventQueueSize());
+ assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+ assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index de2a2d7..49ff9fc 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -32,6 +32,7 @@ import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import
org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
@@ -85,13 +86,8 @@ public class SerialAsyncEventQueueImpl extends
AbstractGatewaySender {
getSenderAdvisor().makeSecondary();
}
}
- if (getDispatcherThreads() > 1) {
- eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
- SerialAsyncEventQueueImpl.this, getThreadMonitorObj());
- } else {
- eventProcessor = new
SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this,
- getId(), getThreadMonitorObj());
- }
+ eventProcessor = createEventProcessor();
+
if (startEventProcessorInPausedState) {
pauseEvenIfProcessorStopped();
}
@@ -105,13 +101,11 @@ public class SerialAsyncEventQueueImpl extends
AbstractGatewaySender {
}
new UpdateAttributesProcessor(this).distribute(false);
-
InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
- logger
- .info("Started {}", this);
+ logger.info("Started {}", this);
enqueueTempEvents();
} finally {
@@ -119,6 +113,19 @@ public class SerialAsyncEventQueueImpl extends
AbstractGatewaySender {
}
}
+ protected AbstractGatewaySenderEventProcessor createEventProcessor() {
+ AbstractGatewaySenderEventProcessor eventProcessor;
+ if (getDispatcherThreads() > 1) {
+ eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
+ SerialAsyncEventQueueImpl.this, getThreadMonitorObj());
+ } else {
+ eventProcessor =
+ new
SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this, getId(),
+ getThreadMonitorObj());
+ }
+ return eventProcessor;
+ }
+
@Override
public void stop() {
if (logger.isDebugEnabled()) {
@@ -177,12 +184,15 @@ public class SerialAsyncEventQueueImpl extends
AbstractGatewaySender {
InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+
+ this.eventProcessor = null;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
- sb.append("SerialGatewaySender{");
+ sb.append(getClass().getSimpleName());
+ sb.append("{");
sb.append("id=" + getId());
sb.append(",remoteDsId=" + getRemoteDSId());
sb.append(",isRunning =" + isRunning());
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 4fe0290..e7eea1b 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -789,10 +789,10 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
}
protected void waitForRunningStatus() {
- synchronized (this.eventProcessor.runningStateLock) {
+ synchronized (this.eventProcessor.getRunningStateLock()) {
while (this.eventProcessor.getException() == null &&
this.eventProcessor.isStopped()) {
try {
- this.eventProcessor.runningStateLock.wait();
+ this.eventProcessor.getRunningStateLock().wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0cfee64..6a3a8b6 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -104,7 +104,7 @@ public abstract class AbstractGatewaySenderEventProcessor
extends LoggingThread
*/
protected final Object pausedLock = new Object();
- public final Object runningStateLock = new Object();
+ private final Object runningStateLock = new Object();
/**
* A boolean verifying whether a warning has already been issued if the
event queue has reached a
@@ -154,6 +154,10 @@ public abstract class AbstractGatewaySenderEventProcessor
extends LoggingThread
this.threadMonitoring = tMonitoring;
}
+ public Object getRunningStateLock() {
+ return runningStateLock;
+ }
+
@Override
public int getTotalQueueSize() {
return getQueue().size();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index ef4d3e8..6cf556a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -182,14 +182,14 @@ public class ConcurrentParallelGatewaySenderEventProcessor
this.ex = e;
}
- synchronized (this.runningStateLock) {
+ synchronized (this.getRunningStateLock()) {
if (ex != null) {
this.setException(ex);
setIsStopped(true);
} else {
setIsStopped(false);
}
- this.runningStateLock.notifyAll();
+ this.getRunningStateLock().notifyAll();
}
for (ParallelGatewaySenderEventProcessor parallelProcessor :
this.processors) {
@@ -206,10 +206,10 @@ public class ConcurrentParallelGatewaySenderEventProcessor
private void waitForRunningStatus() {
for (ParallelGatewaySenderEventProcessor parallelProcessor :
this.processors) {
- synchronized (parallelProcessor.runningStateLock) {
+ synchronized (parallelProcessor.getRunningStateLock()) {
while (parallelProcessor.getException() == null &&
parallelProcessor.isStopped()) {
try {
- parallelProcessor.runningStateLock.wait();
+ parallelProcessor.getRunningStateLock().wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index c1f116b..63f35c7 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -183,14 +183,14 @@ public class ConcurrentSerialGatewaySenderEventProcessor
this.ex = e;
}
- synchronized (this.runningStateLock) {
+ synchronized (this.getRunningStateLock()) {
if (ex != null) {
this.setException(ex);
setIsStopped(true);
} else {
setIsStopped(false);
}
- this.runningStateLock.notifyAll();
+ this.getRunningStateLock().notifyAll();
}
for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
@@ -213,10 +213,10 @@ public class ConcurrentSerialGatewaySenderEventProcessor
private void waitForRunningStatus() {
for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
- synchronized (serialProcessor.runningStateLock) {
+ synchronized (serialProcessor.getRunningStateLock()) {
while (serialProcessor.getException() == null &&
serialProcessor.isStopped()) {
try {
- serialProcessor.runningStateLock.wait();
+ serialProcessor.getRunningStateLock().wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index f1df2b8..d4f3818 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -907,7 +907,7 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
}
} else {
throw new IllegalStateException(
- "Queue region " + this.region.getFullPath() + " already exists.");
+ "Queue region " + this.regionName + " already exists.");
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java
new file mode 100644
index 0000000..4c4aa2e
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.cache.asyncqueue.internal;
+
+import static
org.apache.geode.cache.wan.GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
+import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.AEQTest;
+
+/**
+ * Extracted from AsyncEventListenerDistributedTest.
+ */
+@Category(AEQTest.class)
+public class SerialAsyncEventQueueImplTest {
+
+ private InternalCache cache;
+
+ private SerialAsyncEventQueueImpl serialAsyncEventQueue;
+ private StatisticsFactory statisticsFactory;
+ private GatewaySenderAttributes gatewaySenderAttributes;
+ private StatisticsClock statisticsClock;
+
+ @Before
+ public void setUp() throws Exception {
+ cache = Fakes.cache();
+ when(cache.getRegion(any())).thenReturn(null);
+ when(cache.createVMRegion(any(), any(),
any())).thenReturn(mock(LocalRegion.class));
+
+ statisticsFactory = mock(StatisticsFactory.class);
+ when(statisticsFactory.createAtomicStatistics(any(),
any())).thenReturn(mock(Statistics.class));
+
+ gatewaySenderAttributes = mock(GatewaySenderAttributes.class);
+ when(gatewaySenderAttributes.getId()).thenReturn("sender");
+
when(gatewaySenderAttributes.getRemoteDSId()).thenReturn(DEFAULT_DISTRIBUTED_SYSTEM_ID);
+ when(gatewaySenderAttributes.getMaximumQueueMemory()).thenReturn(10);
+ when(gatewaySenderAttributes.getDispatcherThreads()).thenReturn(1);
+ when(gatewaySenderAttributes.isForInternalUse()).thenReturn(false);
+
+ statisticsClock = mock(StatisticsClock.class);
+
+ DistributedLockService distributedLockService =
mock(DistributedLockService.class);
+ when(distributedLockService.lock(any(), anyLong(),
anyLong())).thenReturn(true);
+
when(cache.getGatewaySenderLockService()).thenReturn(distributedLockService);
+ }
+
+ private SerialAsyncEventQueueImpl createSerialAsyncEventQueueImplSpy() {
+ GatewaySenderAdvisor gatewaySenderAdvisor =
mock(GatewaySenderAdvisor.class);
+ when(gatewaySenderAdvisor.isPrimary()).thenReturn(true);
+
+ AbstractGatewaySenderEventProcessor eventProcessor =
+ mock(AbstractGatewaySenderEventProcessor.class);
+ when(eventProcessor.isStopped()).thenReturn(false);
+ when(eventProcessor.getRunningStateLock()).thenReturn(mock(Object.class));
+
+ SerialAsyncEventQueueImpl serialAsyncEventQueue =
+ new SerialAsyncEventQueueImpl(cache, statisticsFactory,
statisticsClock,
+ gatewaySenderAttributes);
+ SerialAsyncEventQueueImpl spySerialAsyncEventQueue =
spy(serialAsyncEventQueue);
+
doReturn(gatewaySenderAdvisor).when(spySerialAsyncEventQueue).getSenderAdvisor();
+
doReturn(eventProcessor).when(spySerialAsyncEventQueue).createEventProcessor();
+ doReturn(null).when(spySerialAsyncEventQueue).getQueues();
+
+ return spySerialAsyncEventQueue;
+ }
+
+ @Test
+ public void whenStartedShouldCreateEventProcessor() {
+ serialAsyncEventQueue = createSerialAsyncEventQueueImplSpy();
+
+ serialAsyncEventQueue.start();
+
+ assertThat(serialAsyncEventQueue.getEventProcessor()).isNotNull();
+ }
+
+ @Test
+ public void whenStoppedShouldResetTheEventProcessor() {
+ serialAsyncEventQueue = createSerialAsyncEventQueueImplSpy();
+
+ serialAsyncEventQueue.stop();
+
+ assertThat(serialAsyncEventQueue.getEventProcessor()).isNull();
+ }
+
+}