This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1de10671f8 ARTEMIS-3609 Do not use netty thread for thread completion
listener
1de10671f8 is described below
commit 1de10671f8faf97f325bdbcfcd98e6d73e2a5f91
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Dec 15 09:19:32 2022 -0500
ARTEMIS-3609 Do not use netty thread for thread completion listener
---
.../activemq/artemis/core/client/impl/ClientProducerImpl.java | 2 +-
.../activemq/artemis/core/client/impl/ClientSessionImpl.java | 5 +++++
.../artemis/core/client/impl/ClientSessionInternal.java | 4 ++++
.../core/client/impl/SendAcknowledgementHandlerWrapper.java | 11 +++++++++--
.../artemis/tests/integration/client/JMSTransactionTest.java | 6 ++----
5 files changed, 21 insertions(+), 7 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 25e5f7a372..b6dad465fd 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -136,7 +136,7 @@ public class ClientProducerImpl implements
ClientProducerInternal {
checkClosed();
if (handler != null) {
- handler = new SendAcknowledgementHandlerWrapper(handler);
+ handler = new SendAcknowledgementHandlerWrapper(handler,
session.getSessionExecutor());
}
doSend(address1, message, handler);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index dbafef1e03..301faf2f4e 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -2222,4 +2222,9 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
public SessionContext getSessionContext() {
return sessionContext;
}
+
+ @Override
+ public Executor getSessionExecutor() {
+ return executor;
+ }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index 173087d9cc..59c68201ea 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.client.impl;
+import java.util.concurrent.Executor;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -135,4 +137,6 @@ public interface ClientSessionInternal extends
ClientSession {
boolean isWritable(ReadyListener callback);
SessionContext getSessionContext();
+
+ Executor getSessionExecutor();
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
index f98421a60b..931f3868a4 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
@@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.core.client.impl;
+import java.util.concurrent.Executor;
+
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.actors.Actor;
public class SendAcknowledgementHandlerWrapper implements
SendAcknowledgementHandler {
@@ -31,15 +34,19 @@ public class SendAcknowledgementHandlerWrapper implements
SendAcknowledgementHan
*/
private volatile boolean active = true;
- public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler
wrapped) {
+ private final Actor<Message> messageActor;
+
+ public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler
wrapped, Executor executor) {
this.wrapped = wrapped;
+ messageActor = new Actor<>(executor, wrapped::sendAcknowledged);
}
+
@Override
public void sendAcknowledged(Message message) {
if (active) {
try {
- wrapped.sendAcknowledged(message);
+ messageActor.act(message);
} finally {
active = false;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
index 0f8ab8b2f5..d6e12b984a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
@@ -23,7 +23,6 @@ import
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
-import org.junit.Assert;
import org.junit.Test;
import javax.jms.CompletionListener;
@@ -33,7 +32,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class JMSTransactionTest extends JMSTestBase {
@@ -71,7 +69,7 @@ public class JMSTransactionTest extends JMSTestBase {
@Override
public void onCompletion(Message message) {
try {
- commitLatch.await(100, TimeUnit.MILLISECONDS); // can't
block the netty thread. We will delay things, but can't block it otherwise the
test just blocks
+ commitLatch.await();
sentMessages.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
@@ -86,7 +84,7 @@ public class JMSTransactionTest extends JMSTestBase {
}
session.commit();
- Assert.assertEquals(messages, sentMessages.get());
+ Wait.assertEquals(messages, sentMessages::get);
org.apache.activemq.artemis.core.server.Queue queueView =
server.locateQueue(SimpleString.toSimpleString(queueName));
Wait.assertEquals(messages, queueView::getMessageCount);