This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit dffcb9446aef09c7bf6e626121f4d2ec5c74586f
Author: Kirk Lund <[email protected]>
AuthorDate: Tue Sep 17 10:39:05 2019 -0700

    GEODE-7152: Send alert messages using executor
    
    AlertAppender uses a ThreadLocal to prevent recursive calls from
    actually doing anything. However, a recent upgrade to our log4j
    dependencies seems to have changed the behavior such that log4j refuses
    to invoke doAppend if the thread is currently handling a sendAlert
    initiated from doAppend. To fix this bug, sendAlert must be async.
---
 .../alerting/internal/ClusterAlertMessaging.java   | 63 ++++++++++++----------
 .../internal/ClusterAlertMessagingTest.java        | 41 ++++++++++++--
 2 files changed, 72 insertions(+), 32 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java
 
b/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java
index ac49dbd..8094270 100644
--- 
a/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java
+++ 
b/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java
@@ -14,7 +14,10 @@
  */
 package org.apache.geode.alerting.internal;
 
+import static 
org.apache.geode.internal.logging.LoggingExecutors.newFixedThreadPool;
+
 import java.time.Instant;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.logging.log4j.Logger;
 
@@ -35,20 +38,24 @@ public class ClusterAlertMessaging implements 
AlertMessaging {
   private final InternalDistributedSystem system;
   private final DistributionManager dm;
   private final AlertListenerMessageFactory alertListenerMessageFactory;
+  private final ExecutorService executor;
 
   public ClusterAlertMessaging(final InternalDistributedSystem system) {
     this(system,
         system.getDistributionManager(),
-        new AlertListenerMessageFactory());
+        new AlertListenerMessageFactory(),
+        newFixedThreadPool("AlertingMessaging Processor", true, 1));
   }
 
   @VisibleForTesting
   ClusterAlertMessaging(final InternalDistributedSystem system,
       final DistributionManager dm,
-      final AlertListenerMessageFactory alertListenerMessageFactory) {
+      final AlertListenerMessageFactory alertListenerMessageFactory,
+      final ExecutorService executor) {
     this.system = system;
     this.dm = dm;
     this.alertListenerMessageFactory = alertListenerMessageFactory;
+    this.executor = executor;
   }
 
   @Override
@@ -59,35 +66,37 @@ public class ClusterAlertMessaging implements 
AlertMessaging {
       final long threadId,
       final String formattedMessage,
       final String stackTrace) {
-    try {
-      String connectionName = system.getConfig().getName();
-
-      AlertListenerMessage message =
-          alertListenerMessageFactory.createAlertListenerMessage(member, 
alertLevel, timestamp,
-              connectionName, threadName, threadId, formattedMessage, 
stackTrace);
-
-      if (member.equals(system.getDistributedMember())) {
-        // process in local member
-        logger.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, 
[{}], [{}].",
-            member, alertLevel, timestamp, connectionName, threadName, 
threadId, formattedMessage,
-            stackTrace);
-        processAlertListenerMessage(message);
-
-      } else {
-        // send to remote member
-        logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, 
[{}], [{}].",
-            member, alertLevel, timestamp, connectionName, threadName, 
threadId, formattedMessage,
-            stackTrace);
-        dm.putOutgoing(message);
+    executor.submit(() -> {
+      try {
+        String connectionName = system.getConfig().getName();
+
+        AlertListenerMessage message =
+            alertListenerMessageFactory.createAlertListenerMessage(member, 
alertLevel, timestamp,
+                connectionName, threadName, threadId, formattedMessage, 
stackTrace);
+
+        if (member.equals(system.getDistributedMember())) {
+          // process in local member
+          logger.debug("Processing local alert message: {}, {}, {}, {}, {}, 
{}, [{}], [{}].",
+              member, alertLevel, timestamp, connectionName, threadName, 
threadId, formattedMessage,
+              stackTrace);
+          processAlertListenerMessage(message);
+
+        } else {
+          // send to remote member
+          logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, 
[{}], [{}].",
+              member, alertLevel, timestamp, connectionName, threadName, 
threadId, formattedMessage,
+              stackTrace);
+          dm.putOutgoing(message);
+        }
+      } catch (ReenteredConnectException ignore) {
+        // OK. We can't send to this recipient because we're in the middle of
+        // trying to connect to it.
       }
-    } catch (ReenteredConnectException ignore) {
-      // OK. We can't send to this recipient because we're in the middle of
-      // trying to connect to it.
-    }
+    });
   }
 
   public void close() {
-    // nothing
+    executor.shutdownNow();
   }
 
   @Override
diff --git 
a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
 
b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
index df413e7..3242ae7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.alerting.internal;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
@@ -26,6 +27,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -34,6 +38,7 @@ import org.junit.experimental.categories.Category;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.alerting.internal.spi.AlertLevel;
 import org.apache.geode.distributed.DistributedMember;
@@ -74,7 +79,7 @@ public class ClusterAlertMessagingTest {
   @Test
   public void sendAlertProcessesMessageIfMemberIsLocal() {
     ClusterAlertMessaging clusterAlertMessaging = spyClusterAlertMessaging(
-        mock(ClusterDistributionManager.class));
+        mock(ClusterDistributionManager.class), 
currentThreadExecutorService());
 
     clusterAlertMessaging.sendAlert(localMember, AlertLevel.WARNING, 
Instant.now(), "threadName",
         Thread.currentThread().getId(), "formattedMessage", "stackTrace");
@@ -85,7 +90,8 @@ public class ClusterAlertMessagingTest {
   @Test
   public void sendAlertSendsMessageIfMemberIsRemote() {
     DistributionManager dm = mock(ClusterDistributionManager.class);
-    ClusterAlertMessaging clusterAlertMessaging = spyClusterAlertMessaging(dm);
+    ClusterAlertMessaging clusterAlertMessaging =
+        spyClusterAlertMessaging(dm, currentThreadExecutorService());
 
     clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, 
Instant.now(), "threadName",
         Thread.currentThread().getId(), "formattedMessage", "stackTrace");
@@ -94,9 +100,21 @@ public class ClusterAlertMessagingTest {
   }
 
   @Test
+  public void sendAlertUsesExecutorService() {
+    ExecutorService executor = currentThreadExecutorService();
+    ClusterAlertMessaging clusterAlertMessaging =
+        spyClusterAlertMessaging(mock(ClusterDistributionManager.class), 
executor);
+
+    clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, 
Instant.now(), "threadName",
+        Thread.currentThread().getId(), "formattedMessage", "stackTrace");
+
+    verify(executor).submit(any(Runnable.class));
+  }
+
+  @Test
   public void 
processAlertListenerMessage_requires_ClusterDistributionManager() {
     ClusterAlertMessaging clusterAlertMessaging = spy(new 
ClusterAlertMessaging(system,
-        mock(DistributionManager.class), alertListenerMessageFactory));
+        mock(DistributionManager.class), alertListenerMessageFactory, 
mock(ExecutorService.class)));
 
     Throwable thrown = catchThrowable(
         () -> 
clusterAlertMessaging.processAlertListenerMessage(alertListenerMessage));
@@ -104,7 +122,8 @@ public class ClusterAlertMessagingTest {
     assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
   }
 
-  private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager 
distributionManager) {
+  private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager 
distributionManager,
+      ExecutorService executorService) {
     
when(alertListenerMessageFactory.createAlertListenerMessage(any(DistributedMember.class),
         any(AlertLevel.class), any(Instant.class), anyString(), anyString(), 
anyLong(), anyString(),
         anyString()))
@@ -115,6 +134,18 @@ public class ClusterAlertMessagingTest {
         .thenReturn(config);
     when(system.getDistributedMember())
         .thenReturn(localMember);
-    return spy(new ClusterAlertMessaging(system, distributionManager, 
alertListenerMessageFactory));
+    return spy(new ClusterAlertMessaging(system, distributionManager, 
alertListenerMessageFactory,
+        executorService));
+  }
+
+  private ExecutorService currentThreadExecutorService() {
+    ExecutorService executor = mock(ExecutorService.class);
+    when(executor.submit(isA(Runnable.class)))
+        .thenAnswer((Answer<Future<?>>) invocation -> {
+          Runnable task = invocation.getArgument(0);
+          task.run();
+          return CompletableFuture.completedFuture(null);
+        });
+    return executor;
   }
 }

Reply via email to