This is an automated email from the ASF dual-hosted git repository.

pkarwasz pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git


The following commit(s) were added to refs/heads/2.x by this push:
     new f1b928cba5 Improves stability of JeroMqAppenderTest
f1b928cba5 is described below

commit f1b928cba5e6aec6ef3fe6e0410c2b12235595c9
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Thu Mar 23 09:13:56 2023 +0100

    Improves stability of JeroMqAppenderTest
    
    The test needs to wait for the JeroMqAppender to receive the
    subscription request from the client.
    
    We change the socket type from PUB to XPUB to be able to receive
    subscriptions.
---
 .../appender/mom/jeromq/JeroMqAppenderTest.java     | 21 ++++++++++++++++-----
 .../core/appender/mom/jeromq/JeroMqAppender.java    |  6 +++++-
 .../core/appender/mom/jeromq/JeroMqManager.java     | 15 +++++++++++++--
 3 files changed, 34 insertions(+), 8 deletions(-)

diff --git 
a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
 
b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
index c3cfac08f7..2de2102efe 100644
--- 
a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
+++ 
b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.ThreadContext;
@@ -32,13 +33,12 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static org.assertj.core.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
 
 @Tag("zeromq")
-@Tag("sleepy")
-@Timeout(value = 200)
+@Timeout(value = 10, unit = TimeUnit.SECONDS)
 @LoggerContextSource(value = "JeroMqAppenderTest.xml", timeout = 60)
 public class JeroMqAppenderTest {
 
@@ -63,7 +63,7 @@ public class JeroMqAppenderTest {
         final ExecutorService executor = Executors.newSingleThreadExecutor();
         try {
             final Future<List<String>> future = executor.submit(client);
-            Thread.sleep(100);
+            waitForSubscription(appender, 1000);
             appender.resetSendRcs();
             logger.info("Hello");
             logger.info("Again");
@@ -90,7 +90,7 @@ public class JeroMqAppenderTest {
         final ExecutorService executor = Executors.newSingleThreadExecutor();
         try {
             final Future<List<String>> future = executor.submit(client);
-            Thread.sleep(100);
+            waitForSubscription(appender, 1000);
             appender.resetSendRcs();
             final ExecutorService fixedThreadPool = 
Executors.newFixedThreadPool(nThreads);
             for (int i = 0; i < 10.; i++) {
@@ -133,4 +133,15 @@ public class JeroMqAppenderTest {
         assertEquals(2, appender.getSendRcTrue());
         assertEquals(0, appender.getSendRcFalse());
     }
+
+    private void waitForSubscription(JeroMqAppender appender, int timeoutMs) 
throws Exception {
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < timeoutMs) {
+            byte[] msg = appender.recv(timeoutMs);
+            if (msg != null && msg.length > 0 && msg[0] == 1) {
+                return;
+            }
+        }
+        throw new TimeoutException();
+    }
 }
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
index ef8593fb0d..d40a3f2f55 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -14,7 +14,6 @@
  * See the license for the specific language governing permissions and
  * limitations under the license.
  */
-
 package org.apache.logging.log4j.core.appender.mom.jeromq;
 
 import java.io.Serializable;
@@ -173,6 +172,11 @@ public final class JeroMqAppender extends AbstractAppender 
{
         sendRcTrue = sendRcFalse = 0;
     }
 
+    // not public, handy for testing
+    byte[] recv(int timeoutMs) {
+        return manager.recv(timeoutMs);
+    }
+
     @Override
     public String toString() {
         return "JeroMqAppender{" +
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index 2d20a53bd5..e9deb8173f 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -14,7 +14,6 @@
  * See the license for the specific language governing permissions and
  * limitations under the license.
  */
-
 package org.apache.logging.log4j.core.appender.mom.jeromq;
 
 import java.util.Arrays;
@@ -27,6 +26,7 @@ import org.apache.logging.log4j.core.appender.ManagerFactory;
 import org.apache.logging.log4j.core.util.Cancellable;
 import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
 import org.apache.logging.log4j.util.PropertiesUtil;
+import org.zeromq.SocketType;
 import org.zeromq.ZMQ;
 
 /**
@@ -72,7 +72,7 @@ public class JeroMqManager extends AbstractManager {
 
     private JeroMqManager(final String name, final JeroMqConfiguration config) 
{
         super(null, name);
-        publisher = CONTEXT.socket(ZMQ.PUB);
+        publisher = CONTEXT.socket(SocketType.XPUB);
         publisher.setAffinity(config.affinity);
         publisher.setBacklog(config.backlog);
         publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
@@ -105,6 +105,17 @@ public class JeroMqManager extends AbstractManager {
         return publisher.send(data);
     }
 
+    // not public, handy for testing
+    byte[] recv(int timeoutMs) {
+        int oldTimeoutMs = publisher.getReceiveTimeOut();
+        try {
+            publisher.setReceiveTimeOut(timeoutMs);
+            return publisher.recv();
+        } finally {
+            publisher.setReceiveTimeOut(oldTimeoutMs);
+        }
+    }
+
     @Override
     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
         publisher.close();

Reply via email to