This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch release/1.11.0
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/release/1.11.0 by this push:
     new d38ad7b  GEODE-7465: Set eventProcessor to null in serial AEQ when it 
is stopped
d38ad7b is described below

commit d38ad7b0c85ede6a181e814e97a1693133433766
Author: Barry Oglesby <[email protected]>
AuthorDate: Thu Nov 21 14:34:38 2019 -1000

    GEODE-7465: Set eventProcessor to null in serial AEQ when it is stopped
    
    
    
    (cherry picked from commit e148cef9cb63eba283cf86bc490eb280023567ce)
---
 .../AsyncEventListenerDistributedTest.java         |  25 ++
 ...AsyncEventListenerStopStartDistributedTest.java | 307 +++++++++++++++++++++
 .../SerialAsyncEventQueueImplJUnitTest.java        |  27 ++
 .../internal/SerialAsyncEventQueueImpl.java        |  32 ++-
 .../internal/cache/wan/AbstractGatewaySender.java  |   4 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   |   6 +-
 ...currentParallelGatewaySenderEventProcessor.java |   8 +-
 ...oncurrentSerialGatewaySenderEventProcessor.java |   8 +-
 .../cache/wan/serial/SerialGatewaySenderQueue.java |   2 +-
 .../internal/SerialAsyncEventQueueImplTest.java    | 118 ++++++++
 10 files changed, 514 insertions(+), 23 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
index 09cab0e..bb79369 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
@@ -161,6 +161,31 @@ public class AsyncEventListenerDistributedTest implements 
Serializable {
   }
 
   @Test // serial, ReplicateRegion
+  public void testSerialAsyncEventQueueStopStart() {
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+
+    vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new 
SpyAsyncEventListener(), false,
+        100, dispatcherThreadCount, 100));
+    vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new 
SpyAsyncEventListener(), false,
+        100, dispatcherThreadCount, 100));
+
+    vm0.invoke(() -> createReplicateRegion(replicateRegionName, 
asyncEventQueueId));
+    vm1.invoke(() -> createReplicateRegion(replicateRegionName, 
asyncEventQueueId));
+
+    vm0.invoke(() -> getInternalGatewaySender().stop());
+    vm1.invoke(() -> getInternalGatewaySender().stop());
+
+    vm0.invoke(() -> doPuts(replicateRegionName, 10));
+
+    vm0.invoke(() -> getInternalGatewaySender().start());
+    vm1.invoke(() -> getInternalGatewaySender().start());
+
+    assertThat(vm0.invoke(() -> getAsyncEventQueue().size())).isEqualTo(0);
+    assertThat(vm1.invoke(() -> getAsyncEventQueue().size())).isEqualTo(0);
+  }
+
+  @Test // serial, ReplicateRegion
   public void testReplicatedSerialAsyncEventQueue() {
     vm0.invoke(this::createCache);
     vm1.invoke(this::createCache);
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/ParallelSerialAsyncEventListenerStopStartDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/ParallelSerialAsyncEventListenerStopStartDistributedTest.java
new file mode 100644
index 0000000..29b26d0
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/ParallelSerialAsyncEventListenerStopStartDistributedTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
+import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.AEQTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+/**
+ * Extracted from {@link AsyncEventListenerDistributedTest}.
+ */
+@Category(AEQTest.class)
+@SuppressWarnings("serial")
+public class ParallelSerialAsyncEventListenerStopStartDistributedTest 
implements Serializable {
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  private String basePartitionedRegionName;
+  private String baseReplicateRegionName;
+  private String baseSerialAsyncEventQueueId;
+  private String baseParallelAsyncEventQueueId;
+
+  private VM vm0;
+  private VM vm1;
+  private VM vm2;
+  private VM vm3;
+
+  @Before
+  public void setUp() throws Exception {
+    vm0 = getVM(0);
+    vm1 = getVM(1);
+    vm2 = getVM(2);
+    vm3 = getVM(3);
+
+    String className = getClass().getSimpleName();
+    basePartitionedRegionName = className + "_PR_";
+    baseReplicateRegionName = className + "_RR_";
+    baseSerialAsyncEventQueueId = className + "_serialAEQ_";
+    baseParallelAsyncEventQueueId = className + "_parallelAEQ_";
+  }
+
+  /**
+   * Override as needed to add to the configuration, such as 
off-heap-memory-size.
+   */
+  protected Properties getDistributedSystemProperties() {
+    return new Properties();
+  }
+
+  /**
+   * Override as needed to add to the configuration, such as 
regionFactory.setOffHeap(boolean).
+   */
+  protected RegionFactory<?, ?> configureRegion(RegionFactory<?, ?> 
regionFactory) {
+    return regionFactory;
+  }
+
+  @Test
+  public void testStopStartPersistentParallelAndSerialAsyncEventQueues() {
+    // Create cache
+    vm0.invoke(() -> createCache());
+    vm1.invoke(() -> createCache());
+    vm2.invoke(() -> createCache());
+    vm3.invoke(() -> createCache());
+
+    // Create several serial and parallel AEQs
+    int numAEQs = 3;
+    vm0.invoke(() -> createAsyncEventQueues(numAEQs));
+    vm1.invoke(() -> createAsyncEventQueues(numAEQs));
+    vm2.invoke(() -> createAsyncEventQueues(numAEQs));
+    vm3.invoke(() -> createAsyncEventQueues(numAEQs));
+
+    // Create replicated and partitioned regions attached to those AEQs
+    int numRegions = 3;
+    vm0.invoke(() -> createRegions(numRegions));
+    vm1.invoke(() -> createRegions(numRegions));
+    vm2.invoke(() -> createRegions(numRegions));
+    vm3.invoke(() -> createRegions(numRegions));
+
+    // Do puts into all replicated and partitioned regions
+    vm0.invoke(() -> doPuts(numRegions, 1000));
+
+    // Wait for all AEQs to be empty
+    vm0.invoke(() -> waitForAsyncQueuesToEmpty());
+    vm1.invoke(() -> waitForAsyncQueuesToEmpty());
+    vm2.invoke(() -> waitForAsyncQueuesToEmpty());
+    vm3.invoke(() -> waitForAsyncQueuesToEmpty());
+
+    // Stop all AEQs
+    vm0.invoke(() -> stopAsyncQueues());
+    vm1.invoke(() -> stopAsyncQueues());
+    vm2.invoke(() -> stopAsyncQueues());
+    vm3.invoke(() -> stopAsyncQueues());
+
+    // Start all AEQs
+    AsyncInvocation startAeqsVm0 = vm0.invokeAsync(() -> startAsyncQueues());
+    AsyncInvocation startAeqsVm1 = vm1.invokeAsync(() -> startAsyncQueues());
+    AsyncInvocation startAeqsVm2 = vm2.invokeAsync(() -> startAsyncQueues());
+    AsyncInvocation startAeqsVm3 = vm3.invokeAsync(() -> startAsyncQueues());
+
+    // Wait for async tasks to complete
+    ThreadUtils.join(startAeqsVm0, 120 * 1000);
+    ThreadUtils.join(startAeqsVm1, 120 * 1000);
+    ThreadUtils.join(startAeqsVm2, 120 * 1000);
+    ThreadUtils.join(startAeqsVm3, 120 * 1000);
+  }
+
+  private void createAsyncEventQueues(int numAEQs) {
+    for (int i = 0; i < numAEQs; i++) {
+      createPersistentAsyncEventQueue(baseSerialAsyncEventQueueId + i,
+          createDiskStoreName(baseSerialAsyncEventQueueId + i), false);
+      createPersistentAsyncEventQueue(baseParallelAsyncEventQueueId + i,
+          createDiskStoreName(baseParallelAsyncEventQueueId + i), true);
+    }
+  }
+
+  private void createRegions(int numRegions) {
+    for (int i = 0; i < numRegions; i++) {
+      createReplicateRegion(baseReplicateRegionName + i, 
baseSerialAsyncEventQueueId + i);
+      createPartitionedRegion(basePartitionedRegionName + i, 
baseParallelAsyncEventQueueId + i);
+    }
+  }
+
+  private void waitForAsyncQueuesToEmpty() {
+    for (final AsyncEventQueue aeq : getAsyncEventQueues()) {
+      await().until(() -> aeq.size() == 0);
+    }
+  }
+
+  private void stopAsyncQueues() {
+    for (final AsyncEventQueue aeq : getAsyncEventQueues()) {
+      InternalAsyncEventQueue iaeq = (InternalAsyncEventQueue) aeq;
+      assertThat(iaeq.getSender().isRunning()).isTrue();
+      iaeq.stop();
+      assertThat(iaeq.getSender().isRunning()).isFalse();
+    }
+  }
+
+  private void startAsyncQueues() {
+    for (final AsyncEventQueue aeq : getAsyncEventQueues()) {
+      GatewaySender sender = ((InternalAsyncEventQueue) aeq).getSender();
+      assertThat(sender.isRunning()).isFalse();
+      sender.start();
+      assertThat(sender.isRunning()).isTrue();
+    }
+  }
+
+  private Collection<AsyncEventQueue> getAsyncEventQueues() {
+    return getCache().getAsyncEventQueues().stream()
+        
.sorted(Comparator.comparing(AsyncEventQueue::getId)).collect(Collectors.toList());
+  }
+
+  private InternalCache getCache() {
+    return cacheRule.getOrCreateCache(getDistributedSystemProperties());
+  }
+
+  private void createCache() {
+    cacheRule.createCache(getDistributedSystemProperties());
+  }
+
+  private void createPartitionedRegion(String regionName, String 
asyncEventQueueId) {
+    assertThat(regionName).isNotEmpty();
+    assertThat(asyncEventQueueId).isNotEmpty();
+
+    PartitionAttributesFactory<?, ?> partitionAttributesFactory = new 
PartitionAttributesFactory();
+
+    RegionFactory<?, ?> regionFactory = 
getCache().createRegionFactory(PARTITION_REDUNDANT);
+    regionFactory.addAsyncEventQueueId(asyncEventQueueId);
+    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+    configureRegion(regionFactory).create(regionName);
+  }
+
+  private void createReplicateRegion(String regionName, String 
asyncEventQueueId) {
+    assertThat(regionName).isNotEmpty();
+    assertThat(asyncEventQueueId).isNotEmpty();
+
+    RegionFactory<?, ?> regionFactory = 
getCache().createRegionFactory(REPLICATE);
+    regionFactory.addAsyncEventQueueId(asyncEventQueueId);
+
+    configureRegion(regionFactory).create(regionName);
+  }
+
+  private void createDiskStore(String diskStoreName, String asyncEventQueueId) 
{
+    assertThat(diskStoreName).isNotEmpty();
+    assertThat(asyncEventQueueId).isNotEmpty();
+
+    File directory = createDirectory(createDiskStoreName(asyncEventQueueId));
+
+    DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
+    diskStoreFactory.setDiskDirs(new File[] {directory});
+
+    diskStoreFactory.create(diskStoreName);
+  }
+
+  private File createDirectory(String name) {
+    assertThat(name).isNotEmpty();
+
+    File directory = new File(temporaryFolder.getRoot(), name);
+    if (!directory.exists()) {
+      try {
+        return temporaryFolder.newFolder(name);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    return directory;
+  }
+
+  private String createDiskStoreName(String asyncEventQueueId) {
+    assertThat(asyncEventQueueId).isNotEmpty();
+
+    return asyncEventQueueId + "_disk_" + getCurrentVMNum();
+  }
+
+  private void createPersistentAsyncEventQueue(String asyncEventQueueId, 
String diskStoreName,
+      boolean isParallel) {
+    createDiskStore(diskStoreName, asyncEventQueueId);
+
+    AsyncEventQueueFactory asyncEventQueueFactory = 
getCache().createAsyncEventQueueFactory();
+    asyncEventQueueFactory.setDiskStoreName(diskStoreName);
+    asyncEventQueueFactory.setParallel(isParallel);
+    asyncEventQueueFactory.setPersistent(true);
+
+    asyncEventQueueFactory.create(asyncEventQueueId, new 
TestAsyncEventListener());
+  }
+
+  private void doPuts(int numRegions, int numPuts) {
+    for (int i = 0; i < numRegions; i++) {
+      Region<Integer, Integer> rr = 
getCache().getRegion(baseReplicateRegionName + i);
+      Region<Integer, Integer> pr = 
getCache().getRegion(basePartitionedRegionName + i);
+      for (int j = 0; j < numPuts; j++) {
+        rr.put(j, j);
+        pr.put(j, j);
+      }
+    }
+  }
+
+  private static class TestAsyncEventListener implements AsyncEventListener {
+
+    @Override
+    public synchronized boolean processEvents(List<AsyncEvent> events) {
+      return true;
+    }
+  }
+}
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 0c23b9f..9138c8c 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -68,4 +68,31 @@ public class SerialAsyncEventQueueImplJUnitTest {
     assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
   }
 
+  @Test
+  public void testStopStart() {
+    GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+    attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
+    SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
+        cache.getInternalDistributedSystem().getStatisticsManager(), 
cache.getStatisticsClock(),
+        attrs);
+    queue.getStatistics().incQueueSize(5);
+    queue.getStatistics().incSecondaryQueueSize(6);
+    queue.getStatistics().incTempQueueSize(10);
+    queue.getStatistics().incEventsProcessedByPQRM(3);
+
+    assertEquals(5, queue.getStatistics().getEventQueueSize());
+    assertEquals(6, queue.getStatistics().getSecondaryEventQueueSize());
+    assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(3, queue.getStatistics().getEventsProcessedByPQRM());
+
+    queue.start();
+    queue.stop();
+    queue.start();
+
+    assertEquals(0, queue.getStatistics().getEventQueueSize());
+    assertEquals(0, queue.getStatistics().getSecondaryEventQueueSize());
+    assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
+  }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index de2a2d7..49ff9fc 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -32,6 +32,7 @@ import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import 
org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
 import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
 import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
@@ -85,13 +86,8 @@ public class SerialAsyncEventQueueImpl extends 
AbstractGatewaySender {
           getSenderAdvisor().makeSecondary();
         }
       }
-      if (getDispatcherThreads() > 1) {
-        eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
-            SerialAsyncEventQueueImpl.this, getThreadMonitorObj());
-      } else {
-        eventProcessor = new 
SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this,
-            getId(), getThreadMonitorObj());
-      }
+      eventProcessor = createEventProcessor();
+
       if (startEventProcessorInPausedState) {
         pauseEvenIfProcessorStopped();
       }
@@ -105,13 +101,11 @@ public class SerialAsyncEventQueueImpl extends 
AbstractGatewaySender {
       }
       new UpdateAttributesProcessor(this).distribute(false);
 
-
       InternalDistributedSystem system =
           (InternalDistributedSystem) this.cache.getDistributedSystem();
       system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
 
-      logger
-          .info("Started {}", this);
+      logger.info("Started {}", this);
 
       enqueueTempEvents();
     } finally {
@@ -119,6 +113,19 @@ public class SerialAsyncEventQueueImpl extends 
AbstractGatewaySender {
     }
   }
 
+  protected AbstractGatewaySenderEventProcessor createEventProcessor() {
+    AbstractGatewaySenderEventProcessor eventProcessor;
+    if (getDispatcherThreads() > 1) {
+      eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
+          SerialAsyncEventQueueImpl.this, getThreadMonitorObj());
+    } else {
+      eventProcessor =
+          new 
SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this, getId(),
+              getThreadMonitorObj());
+    }
+    return eventProcessor;
+  }
+
   @Override
   public void stop() {
     if (logger.isDebugEnabled()) {
@@ -177,12 +184,15 @@ public class SerialAsyncEventQueueImpl extends 
AbstractGatewaySender {
     InternalDistributedSystem system =
         (InternalDistributedSystem) this.cache.getDistributedSystem();
     system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+
+    this.eventProcessor = null;
   }
 
   @Override
   public String toString() {
     StringBuffer sb = new StringBuffer();
-    sb.append("SerialGatewaySender{");
+    sb.append(getClass().getSimpleName());
+    sb.append("{");
     sb.append("id=" + getId());
     sb.append(",remoteDsId=" + getRemoteDSId());
     sb.append(",isRunning =" + isRunning());
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 4fe0290..e7eea1b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -789,10 +789,10 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
   }
 
   protected void waitForRunningStatus() {
-    synchronized (this.eventProcessor.runningStateLock) {
+    synchronized (this.eventProcessor.getRunningStateLock()) {
       while (this.eventProcessor.getException() == null && 
this.eventProcessor.isStopped()) {
         try {
-          this.eventProcessor.runningStateLock.wait();
+          this.eventProcessor.getRunningStateLock().wait();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0cfee64..6a3a8b6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -104,7 +104,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends LoggingThread
    */
   protected final Object pausedLock = new Object();
 
-  public final Object runningStateLock = new Object();
+  private final Object runningStateLock = new Object();
 
   /**
    * A boolean verifying whether a warning has already been issued if the 
event queue has reached a
@@ -154,6 +154,10 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends LoggingThread
     this.threadMonitoring = tMonitoring;
   }
 
+  public Object getRunningStateLock() {
+    return runningStateLock;
+  }
+
   @Override
   public int getTotalQueueSize() {
     return getQueue().size();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index ef4d3e8..6cf556a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -182,14 +182,14 @@ public class ConcurrentParallelGatewaySenderEventProcessor
       this.ex = e;
     }
 
-    synchronized (this.runningStateLock) {
+    synchronized (this.getRunningStateLock()) {
       if (ex != null) {
         this.setException(ex);
         setIsStopped(true);
       } else {
         setIsStopped(false);
       }
-      this.runningStateLock.notifyAll();
+      this.getRunningStateLock().notifyAll();
     }
 
     for (ParallelGatewaySenderEventProcessor parallelProcessor : 
this.processors) {
@@ -206,10 +206,10 @@ public class ConcurrentParallelGatewaySenderEventProcessor
 
   private void waitForRunningStatus() {
     for (ParallelGatewaySenderEventProcessor parallelProcessor : 
this.processors) {
-      synchronized (parallelProcessor.runningStateLock) {
+      synchronized (parallelProcessor.getRunningStateLock()) {
         while (parallelProcessor.getException() == null && 
parallelProcessor.isStopped()) {
           try {
-            parallelProcessor.runningStateLock.wait();
+            parallelProcessor.getRunningStateLock().wait();
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
           }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index c1f116b..63f35c7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -183,14 +183,14 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       this.ex = e;
     }
 
-    synchronized (this.runningStateLock) {
+    synchronized (this.getRunningStateLock()) {
       if (ex != null) {
         this.setException(ex);
         setIsStopped(true);
       } else {
         setIsStopped(false);
       }
-      this.runningStateLock.notifyAll();
+      this.getRunningStateLock().notifyAll();
     }
 
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
@@ -213,10 +213,10 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   private void waitForRunningStatus() {
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
-      synchronized (serialProcessor.runningStateLock) {
+      synchronized (serialProcessor.getRunningStateLock()) {
         while (serialProcessor.getException() == null && 
serialProcessor.isStopped()) {
           try {
-            serialProcessor.runningStateLock.wait();
+            serialProcessor.getRunningStateLock().wait();
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
           }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index f1df2b8..d4f3818 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -907,7 +907,7 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
       }
     } else {
       throw new IllegalStateException(
-          "Queue region " + this.region.getFullPath() + " already exists.");
+          "Queue region " + this.regionName + " already exists.");
     }
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java
new file mode 100644
index 0000000..4c4aa2e
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.cache.asyncqueue.internal;
+
+import static 
org.apache.geode.cache.wan.GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
+import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.AEQTest;
+
+/**
+ * Extracted from AsyncEventListenerDistributedTest.
+ */
+@Category(AEQTest.class)
+public class SerialAsyncEventQueueImplTest {
+
+  private InternalCache cache;
+
+  private SerialAsyncEventQueueImpl serialAsyncEventQueue;
+  private StatisticsFactory statisticsFactory;
+  private GatewaySenderAttributes gatewaySenderAttributes;
+  private StatisticsClock statisticsClock;
+
+  @Before
+  public void setUp() throws Exception {
+    cache = Fakes.cache();
+    when(cache.getRegion(any())).thenReturn(null);
+    when(cache.createVMRegion(any(), any(), 
any())).thenReturn(mock(LocalRegion.class));
+
+    statisticsFactory = mock(StatisticsFactory.class);
+    when(statisticsFactory.createAtomicStatistics(any(), 
any())).thenReturn(mock(Statistics.class));
+
+    gatewaySenderAttributes = mock(GatewaySenderAttributes.class);
+    when(gatewaySenderAttributes.getId()).thenReturn("sender");
+    
when(gatewaySenderAttributes.getRemoteDSId()).thenReturn(DEFAULT_DISTRIBUTED_SYSTEM_ID);
+    when(gatewaySenderAttributes.getMaximumQueueMemory()).thenReturn(10);
+    when(gatewaySenderAttributes.getDispatcherThreads()).thenReturn(1);
+    when(gatewaySenderAttributes.isForInternalUse()).thenReturn(false);
+
+    statisticsClock = mock(StatisticsClock.class);
+
+    DistributedLockService distributedLockService = 
mock(DistributedLockService.class);
+    when(distributedLockService.lock(any(), anyLong(), 
anyLong())).thenReturn(true);
+    
when(cache.getGatewaySenderLockService()).thenReturn(distributedLockService);
+  }
+
+  private SerialAsyncEventQueueImpl createSerialAsyncEventQueueImplSpy() {
+    GatewaySenderAdvisor gatewaySenderAdvisor = 
mock(GatewaySenderAdvisor.class);
+    when(gatewaySenderAdvisor.isPrimary()).thenReturn(true);
+
+    AbstractGatewaySenderEventProcessor eventProcessor =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    when(eventProcessor.isStopped()).thenReturn(false);
+    when(eventProcessor.getRunningStateLock()).thenReturn(mock(Object.class));
+
+    SerialAsyncEventQueueImpl serialAsyncEventQueue =
+        new SerialAsyncEventQueueImpl(cache, statisticsFactory, 
statisticsClock,
+            gatewaySenderAttributes);
+    SerialAsyncEventQueueImpl spySerialAsyncEventQueue = 
spy(serialAsyncEventQueue);
+    
doReturn(gatewaySenderAdvisor).when(spySerialAsyncEventQueue).getSenderAdvisor();
+    
doReturn(eventProcessor).when(spySerialAsyncEventQueue).createEventProcessor();
+    doReturn(null).when(spySerialAsyncEventQueue).getQueues();
+
+    return spySerialAsyncEventQueue;
+  }
+
+  @Test
+  public void whenStartedShouldCreateEventProcessor() {
+    serialAsyncEventQueue = createSerialAsyncEventQueueImplSpy();
+
+    serialAsyncEventQueue.start();
+
+    assertThat(serialAsyncEventQueue.getEventProcessor()).isNotNull();
+  }
+
+  @Test
+  public void whenStoppedShouldResetTheEventProcessor() {
+    serialAsyncEventQueue = createSerialAsyncEventQueueImplSpy();
+
+    serialAsyncEventQueue.stop();
+
+    assertThat(serialAsyncEventQueue.getEventProcessor()).isNull();
+  }
+
+}

Reply via email to