This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch feature/GEODE-8478 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3683f708ae75445e6feb177d766e86c8c828c5ea Author: Barry Oglesby <[email protected]> AuthorDate: Tue Sep 1 12:21:15 2020 -0700 GEODE-8478: Refactored logThresholdExceededAlerts and modified it to handle exceptions --- ...GatewaySenderEventProcessorIntegrationTest.java | 76 ++++++++++++++++++++++ .../wan/AbstractGatewaySenderEventProcessor.java | 28 +++++--- 2 files changed, 95 insertions(+), 9 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorIntegrationTest.java new file mode 100644 index 0000000..44778d7 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorIntegrationTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; + +public class AbstractGatewaySenderEventProcessorIntegrationTest { + + protected InternalCache cache; + + @Before + public void setUp() { + this.cache = (InternalCache) new CacheFactory().create(); + } + + @After + public void tearDown() { + if (this.cache != null) { + this.cache.close(); + } + } + + @Test + public void verifyThresholdExceededAlertLogDoesNotThrowException() { + // Mock the sender + AbstractGatewaySender sender = mock(AbstractGatewaySender.class); + when(sender.getAlertThreshold()).thenReturn(1); + when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class)); + + // Mock the processor + AbstractGatewaySenderEventProcessor eventProcessor = + mock(AbstractGatewaySenderEventProcessor.class); + when(eventProcessor.getSender()).thenReturn(sender); + + // Mock the region + LocalRegion lr = mock(LocalRegion.class); + when(lr.getCache()).thenReturn(this.cache); + + // Create the events + List<GatewaySenderEventImpl> events = new ArrayList<>(); + GatewaySenderEventImpl gsei1 = mock(GatewaySenderEventImpl.class); + when(gsei1.getValueAsString(true)).thenThrow(new IllegalStateException("test")); + events.add(gsei1); + + // Invoke the real method + doCallRealMethod().when(eventProcessor).logThresholdExceededAlerts(events); + assertThatCode(() -> eventProcessor.logThresholdExceededAlerts(events)) + .doesNotThrowAnyException(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index e2ab85b..cecf32d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -970,7 +970,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread filteredList.clear(); eventQueueRemove(events.size()); - final GatewaySenderStats statistics = this.sender.getStatistics(); + + logThresholdExceededAlerts(events); + int queueSize = eventQueueSize(); if (this.eventQueueSizeWarning && queueSize <= AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) { @@ -1037,24 +1039,32 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } eventQueueRemove(events.size()); - final GatewaySenderStats statistics = this.sender.getStatistics(); + logThresholdExceededAlerts(events); + } + } - // Log an alert for each event if necessary - if (this.sender.getAlertThreshold() > 0) { - Iterator it = events.iterator(); - long currentTime = System.currentTimeMillis(); - while (it.hasNext()) { + protected void logThresholdExceededAlerts(List<GatewaySenderEventImpl> events) { + // Log an alert for each event if necessary + if (getSender().getAlertThreshold() > 0) { + Iterator it = events.iterator(); + long currentTime = System.currentTimeMillis(); + while (it.hasNext()) { + try { Object o = it.next(); if (o != null && o instanceof GatewaySenderEventImpl) { GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o; - if (ge.getCreationTime() + this.sender.getAlertThreshold() < currentTime) { + if (ge.getCreationTime() + getSender().getAlertThreshold() < currentTime) { logger.warn( "{} event for region={} key={} value={} was in the queue for {} milliseconds", new Object[] {ge.getOperation(), ge.getRegionPath(), ge.getKey(), ge.getValueAsString(true), currentTime - ge.getCreationTime()}); - statistics.incEventsExceedingAlertThreshold(); + getSender().getStatistics().incEventsExceedingAlertThreshold(); } } + } catch (Exception e) { + logger.warn("Caught the following exception attempting to log threshold exceeded alert:", + e); + getSender().getStatistics().incEventsExceedingAlertThreshold(); } } }
