This is an automated email from the ASF dual-hosted git repository.

pkarwasz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git


The following commit(s) were added to refs/heads/main by this push:
     new 50ef59b0f2 Improves stability of JeroMqAppenderTest
50ef59b0f2 is described below

commit 50ef59b0f2a210807267b760420989bfadf4106c
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Thu Mar 23 09:13:56 2023 +0100

    Improves stability of JeroMqAppenderTest
    
    The test needs to wait for the JeroMqAppender to receive the
    subscription request from the client.
    
    We change the socket type from PUB to XPUB to be able to receive
    subscriptions.
---
 .../log4j/jeromq/appender/JeroMqAppender.java       |  5 +++++
 .../log4j/jeromq/appender/JeroMqManager.java        | 16 ++++++++++++++--
 .../log4j/jeromq/appender/JeroMqAppenderTest.java   | 21 ++++++++++++++++-----
 3 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppender.java
 
b/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppender.java
index c28021a82f..98047a0cff 100644
--- 
a/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppender.java
+++ 
b/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppender.java
@@ -172,6 +172,11 @@ public final class JeroMqAppender extends AbstractAppender 
{
         sendRcTrue = sendRcFalse = 0;
     }
 
+    // not public, handy for testing
+    byte[] recv(int timeoutMs) {
+        return manager.recv(timeoutMs);
+    }
+
     @Override
     public String toString() {
         return "JeroMqAppender{" +
diff --git 
a/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqManager.java
 
b/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqManager.java
index e50972df03..51aaf1b840 100644
--- 
a/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqManager.java
+++ 
b/log4j-jeromq/src/main/java/org/apache/logging/log4j/jeromq/appender/JeroMqManager.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.core.appender.ManagerFactory;
 import org.apache.logging.log4j.core.util.Cancellable;
 import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
 import org.apache.logging.log4j.util.PropertiesUtil;
+import org.zeromq.SocketType;
 import org.zeromq.ZMQ;
 
 /**
@@ -60,7 +61,7 @@ public class JeroMqManager extends AbstractManager {
 
         final boolean enableShutdownHook = 
PropertiesUtil.getProperties().getBooleanProperty(
             SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
-        if (enableShutdownHook) {
+        if (enableShutdownHook && LogManager.getFactory() instanceof 
ShutdownCallbackRegistry) {
             SHUTDOWN_HOOK = ((ShutdownCallbackRegistry) 
LogManager.getFactory()).addShutdownCallback(CONTEXT::close);
         } else {
             SHUTDOWN_HOOK = null;
@@ -71,7 +72,7 @@ public class JeroMqManager extends AbstractManager {
 
     private JeroMqManager(final String name, final JeroMqConfiguration config) 
{
         super(null, name);
-        publisher = CONTEXT.socket(ZMQ.PUB);
+        publisher = CONTEXT.socket(SocketType.XPUB);
         publisher.setAffinity(config.affinity);
         publisher.setBacklog(config.backlog);
         publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
@@ -104,6 +105,17 @@ public class JeroMqManager extends AbstractManager {
         return publisher.send(data);
     }
 
+    // not public, handy for testing
+    byte[] recv(int timeoutMs) {
+        int oldTimeoutMs = publisher.getReceiveTimeOut();
+        try {
+            publisher.setReceiveTimeOut(timeoutMs);
+            return publisher.recv();
+        } finally {
+            publisher.setReceiveTimeOut(oldTimeoutMs);
+        }
+    }
+
     @Override
     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
         publisher.close();
diff --git 
a/log4j-jeromq/src/test/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppenderTest.java
 
b/log4j-jeromq/src/test/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppenderTest.java
index e582559281..aafe354a5d 100644
--- 
a/log4j-jeromq/src/test/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppenderTest.java
+++ 
b/log4j-jeromq/src/test/java/org/apache/logging/log4j/jeromq/appender/JeroMqAppenderTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.Logger;
@@ -31,13 +32,12 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static org.assertj.core.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
 
 @Tag("zeromq")
-@Tag("sleepy")
-@Timeout(value = 200)
+@Timeout(value = 10, unit = TimeUnit.SECONDS)
 @LoggerContextSource(value = "JeroMqAppenderTest.xml", timeout = 60)
 public class JeroMqAppenderTest {
 
@@ -61,7 +61,7 @@ public class JeroMqAppenderTest {
         final ExecutorService executor = Executors.newSingleThreadExecutor();
         try {
             final Future<List<String>> future = executor.submit(client);
-            Thread.sleep(100);
+            waitForSubscription(appender, 1000);
             appender.resetSendRcs();
             logger.info("Hello");
             logger.info("Again");
@@ -87,7 +87,7 @@ public class JeroMqAppenderTest {
         final ExecutorService executor = Executors.newSingleThreadExecutor();
         try {
             final Future<List<String>> future = executor.submit(client);
-            Thread.sleep(100);
+            waitForSubscription(appender, 1000);
             appender.resetSendRcs();
             final ExecutorService fixedThreadPool = 
Executors.newFixedThreadPool(nThreads);
             for (int i = 0; i < 10.; i++) {
@@ -129,4 +129,15 @@ public class JeroMqAppenderTest {
         assertEquals(2, appender.getSendRcTrue());
         assertEquals(0, appender.getSendRcFalse());
     }
+
+    private void waitForSubscription(JeroMqAppender appender, int timeoutMs) 
throws Exception {
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < timeoutMs) {
+            byte[] msg = appender.recv(timeoutMs);
+            if (msg != null && msg.length > 0 && msg[0] == 1) {
+                return;
+            }
+        }
+        throw new TimeoutException();
+    }
 }

Reply via email to