This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit dffcb9446aef09c7bf6e626121f4d2ec5c74586f Author: Kirk Lund <[email protected]> AuthorDate: Tue Sep 17 10:39:05 2019 -0700 GEODE-7152: Send alert messages using executor AlertAppender uses a ThreadLocal to prevent recursive calls from actually doing anything. However, a recent upgrade to our log4j dependencies seems to have changed the behavior such that log4j refuses to invoke doAppend if the thread is currently handling a sendAlert initiated from doAppend. To fix this bug, sendAlert must be async. --- .../alerting/internal/ClusterAlertMessaging.java | 63 ++++++++++++---------- .../internal/ClusterAlertMessagingTest.java | 41 ++++++++++++-- 2 files changed, 72 insertions(+), 32 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java b/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java index ac49dbd..8094270 100644 --- a/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java +++ b/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java @@ -14,7 +14,10 @@ */ package org.apache.geode.alerting.internal; +import static org.apache.geode.internal.logging.LoggingExecutors.newFixedThreadPool; + import java.time.Instant; +import java.util.concurrent.ExecutorService; import org.apache.logging.log4j.Logger; @@ -35,20 +38,24 @@ public class ClusterAlertMessaging implements AlertMessaging { private final InternalDistributedSystem system; private final DistributionManager dm; private final AlertListenerMessageFactory alertListenerMessageFactory; + private final ExecutorService executor; public ClusterAlertMessaging(final InternalDistributedSystem system) { this(system, system.getDistributionManager(), - new AlertListenerMessageFactory()); + new AlertListenerMessageFactory(), + newFixedThreadPool("AlertingMessaging Processor", true, 1)); } @VisibleForTesting ClusterAlertMessaging(final InternalDistributedSystem system, final DistributionManager dm, - final AlertListenerMessageFactory alertListenerMessageFactory) { + final AlertListenerMessageFactory alertListenerMessageFactory, + final ExecutorService executor) { this.system = system; this.dm = dm; this.alertListenerMessageFactory = alertListenerMessageFactory; + this.executor = executor; } @Override @@ -59,35 +66,37 @@ public class ClusterAlertMessaging implements AlertMessaging { final long threadId, final String formattedMessage, final String stackTrace) { - try { - String connectionName = system.getConfig().getName(); - - AlertListenerMessage message = - alertListenerMessageFactory.createAlertListenerMessage(member, alertLevel, timestamp, - connectionName, threadName, threadId, formattedMessage, stackTrace); - - if (member.equals(system.getDistributedMember())) { - // process in local member - logger.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", - member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage, - stackTrace); - processAlertListenerMessage(message); - - } else { - // send to remote member - logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", - member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage, - stackTrace); - dm.putOutgoing(message); + executor.submit(() -> { + try { + String connectionName = system.getConfig().getName(); + + AlertListenerMessage message = + alertListenerMessageFactory.createAlertListenerMessage(member, alertLevel, timestamp, + connectionName, threadName, threadId, formattedMessage, stackTrace); + + if (member.equals(system.getDistributedMember())) { + // process in local member + logger.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", + member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage, + stackTrace); + processAlertListenerMessage(message); + + } else { + // send to remote member + logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", + member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage, + stackTrace); + dm.putOutgoing(message); + } + } catch (ReenteredConnectException ignore) { + // OK. We can't send to this recipient because we're in the middle of + // trying to connect to it. } - } catch (ReenteredConnectException ignore) { - // OK. We can't send to this recipient because we're in the middle of - // trying to connect to it. - } + }); } public void close() { - // nothing + executor.shutdownNow(); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java index df413e7..3242ae7 100644 --- a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java +++ b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java @@ -17,6 +17,7 @@ package org.apache.geode.alerting.internal; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; @@ -26,6 +27,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.junit.Before; import org.junit.Rule; @@ -34,6 +38,7 @@ import org.junit.experimental.categories.Category; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; +import org.mockito.stubbing.Answer; import org.apache.geode.alerting.internal.spi.AlertLevel; import org.apache.geode.distributed.DistributedMember; @@ -74,7 +79,7 @@ public class ClusterAlertMessagingTest { @Test public void sendAlertProcessesMessageIfMemberIsLocal() { ClusterAlertMessaging clusterAlertMessaging = spyClusterAlertMessaging( - mock(ClusterDistributionManager.class)); + mock(ClusterDistributionManager.class), currentThreadExecutorService()); clusterAlertMessaging.sendAlert(localMember, AlertLevel.WARNING, Instant.now(), "threadName", Thread.currentThread().getId(), "formattedMessage", "stackTrace"); @@ -85,7 +90,8 @@ public class ClusterAlertMessagingTest { @Test public void sendAlertSendsMessageIfMemberIsRemote() { DistributionManager dm = mock(ClusterDistributionManager.class); - ClusterAlertMessaging clusterAlertMessaging = spyClusterAlertMessaging(dm); + ClusterAlertMessaging clusterAlertMessaging = + spyClusterAlertMessaging(dm, currentThreadExecutorService()); clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", Thread.currentThread().getId(), "formattedMessage", "stackTrace"); @@ -94,9 +100,21 @@ public class ClusterAlertMessagingTest { } @Test + public void sendAlertUsesExecutorService() { + ExecutorService executor = currentThreadExecutorService(); + ClusterAlertMessaging clusterAlertMessaging = + spyClusterAlertMessaging(mock(ClusterDistributionManager.class), executor); + + clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", + Thread.currentThread().getId(), "formattedMessage", "stackTrace"); + + verify(executor).submit(any(Runnable.class)); + } + + @Test public void processAlertListenerMessage_requires_ClusterDistributionManager() { ClusterAlertMessaging clusterAlertMessaging = spy(new ClusterAlertMessaging(system, - mock(DistributionManager.class), alertListenerMessageFactory)); + mock(DistributionManager.class), alertListenerMessageFactory, mock(ExecutorService.class))); Throwable thrown = catchThrowable( () -> clusterAlertMessaging.processAlertListenerMessage(alertListenerMessage)); @@ -104,7 +122,8 @@ public class ClusterAlertMessagingTest { assertThat(thrown).isInstanceOf(IllegalArgumentException.class); } - private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager distributionManager) { + private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager distributionManager, + ExecutorService executorService) { when(alertListenerMessageFactory.createAlertListenerMessage(any(DistributedMember.class), any(AlertLevel.class), any(Instant.class), anyString(), anyString(), anyLong(), anyString(), anyString())) @@ -115,6 +134,18 @@ public class ClusterAlertMessagingTest { .thenReturn(config); when(system.getDistributedMember()) .thenReturn(localMember); - return spy(new ClusterAlertMessaging(system, distributionManager, alertListenerMessageFactory)); + return spy(new ClusterAlertMessaging(system, distributionManager, alertListenerMessageFactory, + executorService)); + } + + private ExecutorService currentThreadExecutorService() { + ExecutorService executor = mock(ExecutorService.class); + when(executor.submit(isA(Runnable.class))) + .thenAnswer((Answer<Future<?>>) invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return CompletableFuture.completedFuture(null); + }); + return executor; } }
