This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8f9a72d257 ARTEMIS-4432 respect actor and operation context for
openwire connection failure processing
8f9a72d257 is described below
commit 8f9a72d257f6c75810a8d354006e1a6f015bf082
Author: Gary Tully <[email protected]>
AuthorDate: Mon Sep 18 16:16:43 2023 +0100
ARTEMIS-4432 respect actor and operation context for openwire connection
failure processing
---
.../artemis/utils/actors/ThresholdActor.java | 16 +++
.../artemis/utils/actors/ThresholdActorTest.java | 47 ++++++++
.../core/protocol/openwire/OpenWireConnection.java | 32 ++++-
.../PrefetchRedeliveryCountOpenwireTest.java | 134 ++++++++++++++++++++-
4 files changed, 226 insertions(+), 3 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
index e0ff665173..4e7bbb6ded 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
@@ -41,6 +41,7 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
private final ActorListener<T> listener;
private final Runnable overThreshold;
private final Runnable clearThreshold;
+ private volatile Runnable shutdownTask;
public ThresholdActor(Executor parent, ActorListener<T> listener, int
maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable
clearThreshold) {
super(parent);
@@ -53,6 +54,10 @@ public class ThresholdActor<T> extends ProcessorBase<Object>
{
@Override
protected final void doTask(Object task) {
+ if (task == shutdownTask) {
+ shutdownTask.run();
+ return;
+ }
if (task == FLUSH) {
clearThreshold.run();
// should set to 0 no matter the value. There's a single thread
setting this value back to zero
@@ -94,4 +99,15 @@ public class ThresholdActor<T> extends ProcessorBase<Object>
{
task(FLUSH);
}
}
+
+ public void shutdown(Runnable runnable) {
+ // do no more pending work
+ tasks.clear();
+ // run this task next
+ shutdownTask = runnable;
+ tasks.add(runnable);
+ // wait for shutdown task to complete
+ flush();
+ shutdown();
+ }
}
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
index 85e6ead043..e554d632f7 100644
---
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
@@ -184,4 +184,51 @@ public class ThresholdActorTest {
}
+ @Test
+ public void testShutdownTask() throws Exception {
+ AtomicInteger lastAcquireFailed = new AtomicInteger(0);
+ lastProcessed.set(0);
+
+ Semaphore allowedTasks = new Semaphore(10);
+ CountDownLatch completedTasks = new CountDownLatch(11);
+ CountDownLatch pendingTasks = new CountDownLatch(11);
+
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+
+ ThresholdActor<Integer> actor = new ThresholdActor<>(executorService,
(i) -> {
+ try {
+ pendingTasks.countDown();
+ if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) {
+ lastProcessed.set(i);
+ } else {
+ lastAcquireFailed.set(i);
+ }
+ completedTasks.countDown();
+ } catch (InterruptedException ignored) {
+ }
+
+ }, 1000, (e) -> {
+ return 1;
+ }, () -> {
+ }, () -> {
+ });
+
+ // expect allowedTasks tasks to complete
+ for (int i = 1; i < 100; i++) {
+ actor.act(i);
+ }
+ // wait for task processing
+ Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS));
+
+ actor.shutdown(() -> {
+ lastProcessed.set(lastProcessed.get() * 1000);
+ });
+
+ Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS));
+
+ // assert processing terminated at block point
+ Assert.assertEquals(10000, lastProcessed.get());
+ // pending task executed as expected
+ Assert.assertEquals(11, lastAcquireFailed.get());
+ }
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index dc35b6b1ea..0956c2adca 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -681,6 +681,24 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
return this.inWireFormat;
}
+ private void rollbackInProgressLocalTransactions() {
+
+ for (Transaction tx : txMap.values()) {
+ AMQSession session = (AMQSession) tx.getProtocolData();
+ if (session != null) {
+ session.getCoreSession().resetTX(tx);
+ try {
+ session.getCoreSession().rollback(false);
+ } catch (Exception expectedOnExistingOutcome) {
+ } finally {
+ session.getCoreSession().resetTX(null);
+ }
+ } else {
+ tx.tryRollback();
+ }
+ }
+ }
+
private void shutdown(boolean fail) {
try {
@@ -754,9 +772,19 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
@Override
public void fail(ActiveMQException me, String message) {
- for (Transaction tx : txMap.values()) {
- tx.tryRollback();
+ final ThresholdActor<Command> localVisibleActor = openWireActor;
+ if (localVisibleActor != null) {
+ localVisibleActor.shutdown(() -> doFail(me, message));
+ } else {
+ doFail(me, message);
}
+ }
+
+ private void doFail(ActiveMQException me, String message) {
+
+ recoverOperationContext();
+
+ rollbackInProgressLocalTransactions();
if (me != null) {
//filter it like the other protocols
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index 9d086a1f54..80d5ca05df 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -17,28 +17,44 @@
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
@Override
public void setUp() throws Exception {
realStore = true;
@@ -176,4 +192,120 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
}
}
}
+
+ @Test(timeout = 60_000)
+ public void
testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws
Exception {
+ Connection exConn = null;
+
+ SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+ AtomicInteger batchConsumed = new AtomicInteger(0);
+
+ try {
+ ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+ exFact.setWatchTopicAdvisories(false);
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setMaximumRedeliveries(4000);
+ exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+ Queue queue = new ActiveMQQueue("exampleQueueTwo");
+
+ exConn = exFact.createConnection();
+
+ exConn.start();
+
+ Session session = exConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ TextMessage message = session.createTextMessage("This is a text
message");
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ int numMessages = 600;
+ for (int i = 0; i < numMessages; i++) {
+ message.setIntProperty("SEQ", i);
+ producer.send(message);
+ }
+ session.close();
+ exConn.close();
+
+ final int batch = numMessages;
+ AtomicInteger commits = new AtomicInteger(0);
+ AtomicBoolean done = new AtomicBoolean(false);
+ while (!done.get()) {
+ // connection per batch attempt
+ exConn = exFact.createConnection();
+ ((ActiveMQConnection) exConn).setCloseTimeout(1); // so rollback
on close won't block after socket close exception
+
+ exConn.start();
+
+ session = exConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ TextMessage messageReceived = null;
+ for (int j = 0; j < batch; j++) {
+ messageReceived = (TextMessage) messageConsumer.receive(2000);
+ if (messageReceived == null) {
+ done.set(true);
+ break;
+ }
+ batchConsumed.incrementAndGet();
+ assertEquals("This is a text message",
messageReceived.getText());
+ }
+
+ // arrange concurrent commit - ack/commit
+ // with server side error, potential for ack/commit and
close-on-fail to contend
+ final CountDownLatch latch = new CountDownLatch(1);
+ Session finalSession = session;
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ latch.countDown();
+ finalSession.commit();
+ commits.incrementAndGet();
+
+ } catch (JMSException e) {
+ }
+ }
+ });
+
+ latch.await(1, TimeUnit.SECONDS);
+ // force a local socket close such that the broker sees an
exception on the connection and fails the consumer via serverConsumer close
+ ((FailoverTransport) ((org.apache.activemq.ActiveMQConnection)
exConn).getTransport().narrow(FailoverTransport.class)).stop();
+ exConn.close();
+ }
+ } finally {
+ if (exConn != null) {
+ exConn.close();
+ }
+ }
+
+ logger.info("Done after: {}, queue: {}", batchConsumed.get(),
server.locateQueue(durableQueue));
+ try {
+ Wait.assertEquals(0L, () ->
server.locateQueue(durableQueue).getDeliveringCount(), 1000);
+ } catch (Throwable e) {
+
+ final AtomicBoolean doOut = new AtomicBoolean(false);
+ PrintStream out = new PrintStream(System.out) {
+
+ @Override
+ public void println(String s) {
+ if (doOut.get()) {
+ super.println(s);
+ } else {
+ if (s.startsWith("### Failed Transactions")) {
+ doOut.set(true);
+ super.println(s);
+ }
+ }
+ }
+ };
+
PrintData.printData(server.getConfiguration().getBindingsLocation(),server.getConfiguration().getJournalLocation(),server.getConfiguration().getPagingLocation(),
out, true, true, true, false, -1);
+
+ throw e;
+ }
+ }
+
}