This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-8478
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3683f708ae75445e6feb177d766e86c8c828c5ea
Author: Barry Oglesby <[email protected]>
AuthorDate: Tue Sep 1 12:21:15 2020 -0700

    GEODE-8478: Refactored logThresholdExceededAlerts and modified it to handle 
exceptions
---
 ...GatewaySenderEventProcessorIntegrationTest.java | 76 ++++++++++++++++++++++
 .../wan/AbstractGatewaySenderEventProcessor.java   | 28 +++++---
 2 files changed, 95 insertions(+), 9 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorIntegrationTest.java
new file mode 100644
index 0000000..44778d7
--- /dev/null
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorIntegrationTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+
+public class AbstractGatewaySenderEventProcessorIntegrationTest {
+
+  protected InternalCache cache;
+
+  @Before
+  public void setUp() {
+    this.cache = (InternalCache) new CacheFactory().create();
+  }
+
+  @After
+  public void tearDown() {
+    if (this.cache != null) {
+      this.cache.close();
+    }
+  }
+
+  @Test
+  public void verifyThresholdExceededAlertLogDoesNotThrowException() {
+    // Mock the sender
+    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    when(sender.getAlertThreshold()).thenReturn(1);
+    when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
+
+    // Mock the processor
+    AbstractGatewaySenderEventProcessor eventProcessor =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    when(eventProcessor.getSender()).thenReturn(sender);
+
+    // Mock the region
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getCache()).thenReturn(this.cache);
+
+    // Create the events
+    List<GatewaySenderEventImpl> events = new ArrayList<>();
+    GatewaySenderEventImpl gsei1 = mock(GatewaySenderEventImpl.class);
+    when(gsei1.getValueAsString(true)).thenThrow(new 
IllegalStateException("test"));
+    events.add(gsei1);
+
+    // Invoke the real method
+    doCallRealMethod().when(eventProcessor).logThresholdExceededAlerts(events);
+    assertThatCode(() -> eventProcessor.logThresholdExceededAlerts(events))
+        .doesNotThrowAnyException();
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index e2ab85b..cecf32d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -970,7 +970,9 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends LoggingThread
 
     filteredList.clear();
     eventQueueRemove(events.size());
-    final GatewaySenderStats statistics = this.sender.getStatistics();
+
+    logThresholdExceededAlerts(events);
+
     int queueSize = eventQueueSize();
 
     if (this.eventQueueSizeWarning && queueSize <= 
AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) {
@@ -1037,24 +1039,32 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends LoggingThread
       }
       eventQueueRemove(events.size());
 
-      final GatewaySenderStats statistics = this.sender.getStatistics();
+      logThresholdExceededAlerts(events);
+    }
+  }
 
-      // Log an alert for each event if necessary
-      if (this.sender.getAlertThreshold() > 0) {
-        Iterator it = events.iterator();
-        long currentTime = System.currentTimeMillis();
-        while (it.hasNext()) {
+  protected void logThresholdExceededAlerts(List<GatewaySenderEventImpl> 
events) {
+    // Log an alert for each event if necessary
+    if (getSender().getAlertThreshold() > 0) {
+      Iterator it = events.iterator();
+      long currentTime = System.currentTimeMillis();
+      while (it.hasNext()) {
+        try {
           Object o = it.next();
           if (o != null && o instanceof GatewaySenderEventImpl) {
             GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
-            if (ge.getCreationTime() + this.sender.getAlertThreshold() < 
currentTime) {
+            if (ge.getCreationTime() + getSender().getAlertThreshold() < 
currentTime) {
               logger.warn(
                   "{} event for region={} key={} value={} was in the queue for 
{} milliseconds",
                   new Object[] {ge.getOperation(), ge.getRegionPath(), 
ge.getKey(),
                       ge.getValueAsString(true), currentTime - 
ge.getCreationTime()});
-              statistics.incEventsExceedingAlertThreshold();
+              getSender().getStatistics().incEventsExceedingAlertThreshold();
             }
           }
+        } catch (Exception e) {
+          logger.warn("Caught the following exception attempting to log 
threshold exceeded alert:",
+              e);
+          getSender().getStatistics().incEventsExceedingAlertThreshold();
         }
       }
     }

Reply via email to