This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 1de10671f8 ARTEMIS-3609 Do not use netty thread for thread completion 
listener
1de10671f8 is described below

commit 1de10671f8faf97f325bdbcfcd98e6d73e2a5f91
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Dec 15 09:19:32 2022 -0500

    ARTEMIS-3609 Do not use netty thread for thread completion listener
---
 .../activemq/artemis/core/client/impl/ClientProducerImpl.java |  2 +-
 .../activemq/artemis/core/client/impl/ClientSessionImpl.java  |  5 +++++
 .../artemis/core/client/impl/ClientSessionInternal.java       |  4 ++++
 .../core/client/impl/SendAcknowledgementHandlerWrapper.java   | 11 +++++++++--
 .../artemis/tests/integration/client/JMSTransactionTest.java  |  6 ++----
 5 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 25e5f7a372..b6dad465fd 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -136,7 +136,7 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
       checkClosed();
 
       if (handler != null) {
-         handler = new SendAcknowledgementHandlerWrapper(handler);
+         handler = new SendAcknowledgementHandlerWrapper(handler, 
session.getSessionExecutor());
       }
 
       doSend(address1, message, handler);
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index dbafef1e03..301faf2f4e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -2222,4 +2222,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
    public SessionContext getSessionContext() {
       return sessionContext;
    }
+
+   @Override
+   public Executor getSessionExecutor() {
+      return executor;
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index 173087d9cc..59c68201ea 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.util.concurrent.Executor;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -135,4 +137,6 @@ public interface ClientSessionInternal extends 
ClientSession {
    boolean isWritable(ReadyListener callback);
 
    SessionContext getSessionContext();
+
+   Executor getSessionExecutor();
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
index f98421a60b..931f3868a4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
@@ -16,8 +16,11 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.util.concurrent.Executor;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.actors.Actor;
 
 public class SendAcknowledgementHandlerWrapper implements 
SendAcknowledgementHandler {
 
@@ -31,15 +34,19 @@ public class SendAcknowledgementHandlerWrapper implements 
SendAcknowledgementHan
     */
    private volatile boolean active = true;
 
-   public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler 
wrapped) {
+   private final Actor<Message> messageActor;
+
+   public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler 
wrapped, Executor executor) {
       this.wrapped = wrapped;
+      messageActor = new Actor<>(executor, wrapped::sendAcknowledged);
    }
 
+
    @Override
    public void sendAcknowledged(Message message) {
       if (active) {
          try {
-            wrapped.sendAcknowledged(message);
+            messageActor.act(message);
          } finally {
             active = false;
          }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
index 0f8ab8b2f5..d6e12b984a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
@@ -23,7 +23,6 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
-import org.junit.Assert;
 import org.junit.Test;
 
 import javax.jms.CompletionListener;
@@ -33,7 +32,6 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class JMSTransactionTest extends JMSTestBase {
@@ -71,7 +69,7 @@ public class JMSTransactionTest extends JMSTestBase {
                @Override
                public void onCompletion(Message message) {
                   try {
-                     commitLatch.await(100, TimeUnit.MILLISECONDS); // can't 
block the netty thread. We will delay things, but can't block it otherwise the 
test just blocks
+                     commitLatch.await();
                      sentMessages.incrementAndGet();
                   } catch (Exception e) {
                      e.printStackTrace();
@@ -86,7 +84,7 @@ public class JMSTransactionTest extends JMSTestBase {
          }
 
          session.commit();
-         Assert.assertEquals(messages, sentMessages.get());
+         Wait.assertEquals(messages, sentMessages::get);
 
          org.apache.activemq.artemis.core.server.Queue queueView = 
server.locateQueue(SimpleString.toSimpleString(queueName));
          Wait.assertEquals(messages, queueView::getMessageCount);

Reply via email to