This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8bb7f0c642 ARTEMIS-5379 Delay journal transaction until page data is
stored
8bb7f0c642 is described below
commit 8bb7f0c642f1f2609f88252d9e6c091927a0cc81
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 2 10:28:16 2025 -0400
ARTEMIS-5379 Delay journal transaction until page data is stored
This is a regression after ARTEMIS-5305 /
24d1bbe603cadb6666a7992e296e6f94ae68e3a1
The commit record could reach the journal before the paged data.
This commit is delaying the write of the journal until all the data for
the transaction has reached the storage.
---
.../artemis/core/paging/impl/PageTimedWriter.java | 9 +
.../artemis/core/transaction/Transaction.java | 11 +
.../core/transaction/impl/TransactionImpl.java | 135 +++++++-
.../resources/servers/validate-page-tx/broker.xml | 110 +++++++
.../tests/soak/paging/ValidatePageTXTest.java | 357 +++++++++++++++++++++
.../core/paging/impl/PageTimedWriterUnitTest.java | 70 +++-
.../core/postoffice/impl/BindingsImplTest.java | 10 +
7 files changed, 688 insertions(+), 14 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
index 78d9795d97..bbeba5e849 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
@@ -118,6 +118,10 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
}
int credits = Math.min(message.getEncodeSize() +
PageReadWriter.SIZE_RECORD, maxCredits);
writeCredits.acquireUninterruptibly(credits);
+ if (tx != null) {
+ // this will delay the commit record until the portion of this task
has been completed
+ tx.delay();
+ }
synchronized (this) {
final boolean replicated = storageManager.isReplicated();
PageEvent event = new PageEvent(context, message, tx, listCtx,
credits, replicated);
@@ -176,6 +180,11 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
if (requireSync) {
performSync();
}
+ for (PageEvent event : pendingEvents) {
+ if (event.tx != null) {
+ event.tx.delayDone();
+ }
+ }
} catch (Exception e) {
logger.warn(e.getMessage(), e);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index deac731375..716c80fe0a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -115,4 +115,15 @@ public interface Transaction {
default boolean isAllowPageTransaction() {
return true;
}
+
+ /**
+ * Calling this will defer the storage of the commit or prepare until
delayDone is called.
+ */
+ void delay();
+
+ /**
+ * This is to be called when the delay portion is done.
+ */
+ void delayDone();
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 03c08ebf1a..83bf5616c2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -29,6 +29,7 @@ import
org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import
org.apache.activemq.artemis.api.core.ActiveMQTransactionTimeoutException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
@@ -75,6 +76,10 @@ public class TransactionImpl implements Transaction {
private Runnable afterWired;
+ private int delayed;
+
+ private Runnable delayedRunnable;
+
@Override
public boolean isAsync() {
return async;
@@ -125,9 +130,6 @@ public class TransactionImpl implements Transaction {
this.timeoutSeconds = timeoutSeconds;
}
- // Transaction implementation
- // -----------------------------------------------------------
-
@Override
public boolean isEffective() {
return state == State.PREPARED || state == State.COMMITTED || state ==
State.ROLLEDBACK;
@@ -217,7 +219,11 @@ public class TransactionImpl implements Transaction {
beforePrepare();
- storageManager.prepare(id, xid);
+ if (delayed > 0) {
+ delayedRunnable = new DelayedPrepare(id, xid);
+ } else {
+ storageManager.prepare(id, xid);
+ }
state = State.PREPARED;
// We use the Callback even for non persistence
@@ -340,16 +346,106 @@ public class TransactionImpl implements Transaction {
}
}
- protected void doCommit() throws Exception {
- if (containsPersistent || xid != null && state == State.PREPARED) {
- // ^^ These are the scenarios where we require a storage.commit
- // for anything else we won't use the journal
+
+ // This runnable will call the parentContext
+ abstract class DelayedRunnable implements Runnable {
+ /**
+ * this is the delegate context that will receive a
+ * done callback after the record is being stored. */
+ OperationContext parentContext;
+
+ /** This is the context to be used on the storage for this task. */
+ OperationContext storageContext;
+
+ long id;
+
+ DelayedRunnable(long id) {
+ parentContext = storageManager.getContext();
+ parentContext.storeLineUp();
+ storageContext = storageManager.newSingleThreadContext();
+ this.id = id;
+ }
+
+ protected abstract void actualRun() throws Exception;
+
+
+ @Override
+ public void run() {
+ // getting the oldContext (probably null) just to leave it
+ // in the way it was found before this method is called
+ OperationContext oldContext = storageManager.getContext();
+ try {
+ storageManager.setContext(storageContext);
+ actualRun();
+ storageContext.executeOnCompletion(new IOCallback() {
+ @Override
+ public void done() {
+ parentContext.done();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ parentContext.onError(errorCode, errorMessage);
+ }
+ });
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ parentContext.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
e.getMessage());
+ } finally {
+ // Cleaning up the thread context, leaving it the way it was before
+ storageManager.setContext(oldContext);
+ }
+
+ }
+ }
+
+ class DelayedCommit extends DelayedRunnable {
+ DelayedCommit(long id) {
+ super(id);
+ }
+
+ @Override
+ protected void actualRun() throws Exception {
if (async) {
storageManager.asyncCommit(id);
} else {
storageManager.commit(id);
}
}
+ }
+
+
+ class DelayedPrepare extends DelayedRunnable {
+ long id;
+ Xid xid;
+
+ DelayedPrepare(long id, Xid xid) {
+ super(id);
+ this.xid = xid;
+ }
+
+ @Override
+ protected void actualRun() throws Exception {
+ storageManager.prepare(id, xid);
+ }
+ }
+
+ protected void doCommit() throws Exception {
+ // We only store a commit record if we had persistent data or if XA was
used
+ // the next if contains the valid scenarios where a TX commit record is
needed.
+ if (containsPersistent || xid != null && state == State.PREPARED) {
+ // notice that the caller of this method is holding a lock on
timeoutLock
+ // which will be used to control the delayed attribute
+ if (delayed > 0) {
+ delayedRunnable = new DelayedCommit(id);
+ } else {
+ if (async) {
+ storageManager.asyncCommit(id);
+ } else {
+ storageManager.commit(id);
+ }
+ }
+ }
state = State.COMMITTED;
}
@@ -511,6 +607,29 @@ public class TransactionImpl implements Transaction {
}
}
+ @Override
+ public void delay() {
+ synchronized (timeoutLock) {
+ delayed++;
+ }
+ }
+
+ @Override
+ public void delayDone() {
+ synchronized (timeoutLock) {
+ if (--delayed <= 0) {
+ if (delayedRunnable != null) {
+ try {
+ delayedRunnable.run();
+ } finally {
+ delayedRunnable = null;
+ }
+ }
+ }
+ }
+ }
+
+
@Override
public synchronized void addOperation(final TransactionOperation operation)
{
// We do this check, because in the case of XA Transactions and paging,
diff --git
a/tests/soak-tests/src/main/resources/servers/validate-page-tx/broker.xml
b/tests/soak-tests/src/main/resources/servers/validate-page-tx/broker.xml
new file mode 100644
index 0000000000..61ce06f952
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/validate-page-tx/broker.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+--><configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
+
+
+
+ <core xmlns="urn:activemq:core">
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+ <large-messages-directory>./data/largemessages</large-messages-directory>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <!-- using a large pg TX to allow some races between journal.commit and
the page file sync -->
+ <page-sync-timeout>5000000</page-sync-timeout>
+
+ <connectors>
+ <connector name="netty-connector">tcp://localhost:61616</connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
+ </acceptors>
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq, guest"/>
+ <permission type="deleteNonDurableQueue" roles="amq, guest"/>
+ <permission type="createDurableQueue" roles="amq, guest"/>
+ <permission type="deleteDurableQueue" roles="amq, guest"/>
+ <permission type="createAddress" roles="amq, guest"/>
+ <permission type="deleteAddress" roles="amq, guest"/>
+ <permission type="consume" roles="amq, guest"/>
+ <permission type="browse" roles="amq, guest"/>
+ <permission type="send" roles="amq, guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq, guest"/>
+ </security-setting>
+ </security-settings>
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-messages>1</max-size-messages>
+ <max-size-bytes>10MB</max-size-bytes>
+ <page-size-bytes>1MB</page-size-bytes>
+
+ <max-read-page-messages>-1</max-read-page-messages>
+ <max-read-page-bytes>20M</max-read-page-bytes>
+ <prefetch-page-bytes>2MB</prefetch-page-bytes>
+
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="exampleQueue">
+ <anycast>
+ <queue name="exampleQueue"/>
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/ValidatePageTXTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/ValidatePageTXTest.java
new file mode 100644
index 0000000000..57356c421d
--- /dev/null
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/ValidatePageTXTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.soak.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ValidatePageTXTest extends SoakTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String SERVER_NAME_0 = "validate-page-tx";
+
+ @BeforeAll
+ public static void createServers() throws Exception {
+ {
+ File serverLocation = getFileServerLocation(SERVER_NAME_0);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = helperCreate();
+
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+
cliCreateServer.setConfiguration("./src/main/resources/servers/validate-page-tx");
+ cliCreateServer.createServer();
+ }
+ }
+
+ private static Process server0;
+
+ @BeforeEach
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ disableCheckThread();
+ }
+
+ @AfterEach
+ @Override
+ public void after() throws Exception {
+ super.after();
+ }
+
+ /**
+ * This test is validating if DuplicateIDs are stored atomically with the
pageCommit and the page writes
+ * At the time this test was written, the commit record was reaching the
journal before the page data
+ * due to the async nature added by <a
href="https://github.com/apache/activemq-artemis/commit/24d1bbe603cadb6666a7992e296e6f94ae68e3a1">ARTEMIS-5305
</a>
+ */
+ @Test
+ public void testValidatePageTX() throws Exception {
+ final String coreURI = "tcp://localhost:61616?callTimeout=1000";
+ final String amqpURI = "tcp://localhost:61616";
+ final int SEND_GROUPS = 5;
+ final int GROUP_SIZE = 3; // XA Core, Regular TX Core, AMQP TX
+ final int BODY_SIZE = 200 * 1024;
+ AtomicBoolean running = new AtomicBoolean(true);
+ ExecutorService executorService =
Executors.newFixedThreadPool(SEND_GROUPS * GROUP_SIZE);
+ runAfter(executorService::shutdownNow);
+
+ runAfter(() -> running.set(false));
+
+ String largeBody;
+
+ {
+ StringBuilder builder = new StringBuilder();
+ while (builder.length() < BODY_SIZE) {
+ builder.append("This is a large body ");
+ }
+ largeBody = builder.toString();
+ }
+
+ server0 = startServer(SERVER_NAME_0, 0, 30000);
+
+ AtomicInteger sequenceGenerator = new AtomicInteger(1);
+
+ ConcurrentHashSet<String> dupList = new ConcurrentHashSet<>();
+
+ CountDownLatch enoughSent = new CountDownLatch(SEND_GROUPS * GROUP_SIZE
* 2);
+ CountDownLatch latchDone = new CountDownLatch(SEND_GROUPS * GROUP_SIZE);
+ AtomicBoolean waitAfterError = new AtomicBoolean(true);
+
+ CyclicBarrier startFlag = new CyclicBarrier(SEND_GROUPS * GROUP_SIZE +
1);
+ CyclicBarrier errorCaptureFlag = new CyclicBarrier(SEND_GROUPS *
GROUP_SIZE + 1);
+
+ int threadCounts = 0;
+
+ // instead of running this test multiple times for multiple
configurations
+ // I'm starting a few possible configurations of the clients:
+ // Core (XA and Non XA) and AMQP (non XA of course)
+ for (int i = 0; i < SEND_GROUPS; i++) {
+ String threadID = "CoreThread" + (threadCounts++);
+ executorService.execute(() -> {
+ sender(false, "core", coreURI, threadID, startFlag,
errorCaptureFlag, waitAfterError, sequenceGenerator, running, largeBody,
dupList, enoughSent, latchDone);
+ });
+ }
+ for (int i = 0; i < SEND_GROUPS; i++) {
+ String threadID = "XAThread" + (threadCounts++);
+ executorService.execute(() -> {
+ sender(true, "core", coreURI, threadID, startFlag,
errorCaptureFlag, waitAfterError, sequenceGenerator, running, largeBody,
dupList, enoughSent, latchDone);
+ });
+ }
+ for (int i = 0; i < SEND_GROUPS; i++) {
+ String threadID = "AMQPThread" + (threadCounts++);
+ executorService.execute(() -> {
+ sender(false, "AMQP", amqpURI, threadID, startFlag,
errorCaptureFlag, waitAfterError, sequenceGenerator, running, largeBody,
dupList, enoughSent, latchDone);
+ });
+ }
+
+ logger.debug("Start flag waiting");
+ startFlag.await(30, TimeUnit.SECONDS);
+
+ assertTrue(enoughSent.await(100, TimeUnit.SECONDS));
+ logger.debug("Enough messages sent, the server will now be stopped");
+
+ // we will stop twice, once with kill, once with a regular stop
+ // this is to avoid writing multiple tests for each scenario.
+ for (int errorLoop = 0; errorLoop < 2; errorLoop++) {
+ if (errorLoop == 0) {
+ logger.debug("server will be killed (destroyForcibily)");
+ server0.destroyForcibly();
+ } else {
+ logger.debug("Server will be stopped with a file");
+ stopServerWithFile(getServerLocation(SERVER_NAME_0));
+ }
+ assertTrue(server0.waitFor(10, TimeUnit.SECONDS));
+ logger.debug("Waiting aligned flags for error part...");
+ // everybody should be aligned in the barrier after the error
+ // this is because I want to scan for prepared transactions and
commit them
+ errorCaptureFlag.await(30, TimeUnit.SECONDS);
+
+ logger.debug("restarting server...");
+ server0 = startServer(SERVER_NAME_0, 0, 60_000);
+ logger.debug("Started servers, doing recovery");
+ fakeXARecovery();
+
+ if (errorLoop == 1) {
+ // no more error alignment after this condition
+ waitAfterError.set(false);
+ }
+
+ // aligning again to restart
+ logger.debug("Re-alining the start to resume");
+ startFlag.await(30, TimeUnit.SECONDS);
+ }
+
+ logger.debug("Waiting some time");
+ Thread.sleep(100);
+ running.set(false);
+
+ logger.debug("running set to false");
+ assertTrue(latchDone.await(300, TimeUnit.SECONDS));
+
+ logger.debug("latch done is done.. shutting down executor");
+
+ executorService.shutdownNow();
+ assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ connection.start();
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("exampleQueue"));
+ while (!dupList.isEmpty()) {
+ TextMessage message = (TextMessage) consumer.receive(10_000);
+ if (message == null) {
+ break;
+ }
+ String myid = message.getStringProperty("myID");
+ logger.debug("Received dupID={}", myid);
+ assertNotNull(message);
+ assertEquals(largeBody, message.getText());
+ if (!dupList.remove(myid)) {
+ logger.debug("Could not find {}", myid);
+ fail("Could not find " + myid);
+ }
+ }
+ session.rollback();
+ if (!dupList.isEmpty()) {
+ logger.warn("Messages that were still in the dupList");
+ for (String missedDuplicate : dupList) {
+ logger.warn("Missed duplicate dupID={}", missedDuplicate);
+ }
+ }
+ assertTrue(dupList.isEmpty());
+ }
+
+ }
+
+ // it will scan any pending prepares and commit them
+ private void fakeXARecovery() throws Exception {
+ XAConnectionFactory factory = (XAConnectionFactory)
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+ try (XAConnection connection = factory.createXAConnection()) {
+ XASession session = connection.createXASession();
+ Xid[] recoveredXids =
session.getXAResource().recover(XAResource.TMSTARTRSCAN);
+ if (recoveredXids != null && recoveredXids.length != 0) {
+ for (Xid xid : recoveredXids) {
+ logger.debug("prepared XA!!!!");
+ session.getXAResource().commit(xid, false);
+ }
+ }
+ assertEquals(0,
session.getXAResource().recover(XAResource.TMENDRSCAN).length);
+ }
+ }
+
+ private void sender(boolean useXA,
+ String protocol,
+ String uri,
+ String threadID,
+ CyclicBarrier startFlag,
+ CyclicBarrier errorCaptureFlag,
+ AtomicBoolean waitAfterError,
+ AtomicInteger sequenceGenerator,
+ AtomicBoolean running,
+ String messageBody,
+ ConcurrentHashSet<String> dupList,
+ CountDownLatch enoughSent,
+ CountDownLatch latchDone) {
+ try {
+ ConnectionFactory factory = null;
+ XAConnectionFactory xaConnectionFactory = null;
+ Connection connection = null;
+ XAConnection xaConnection = null;
+ Session session = null;
+ XASession xasession = null;
+ MessageProducer producer = null;
+ String dupID = String.valueOf(sequenceGenerator.incrementAndGet());
+ boolean firstTime = true;
+ while (running.get()) {
+ try {
+ if (factory == null) {
+ if (useXA) {
+ xaConnectionFactory = (XAConnectionFactory)
CFUtil.createConnectionFactory(protocol, uri);
+ xaConnection = xaConnectionFactory.createXAConnection();
+ connection = xaConnection;
+ xasession = xaConnection.createXASession();
+ session = xasession;
+ } else {
+ factory = CFUtil.createConnectionFactory("CORE", uri);
+ connection = factory.createConnection();
+ session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ }
+ producer =
session.createProducer(session.createQueue("exampleQueue"));
+ }
+ Xid xid = null;
+ if (useXA) {
+ xid = newXID();
+ xasession.getXAResource().start(xid, XAResource.XA_OK);
+ }
+ TextMessage message = session.createTextMessage(messageBody);
+ message.setStringProperty("myID", dupID);
+ message.setStringProperty("_AMQ_DUPL_ID", dupID);
+ logger.debug("sending dupID={}, threadID={}", dupID, threadID);
+ if (firstTime) {
+ firstTime = false;
+ logger.debug("Thread {} waiting a start flag", threadID);
+ startFlag.await(10, TimeUnit.SECONDS);
+ logger.debug("Thread {} received the start flag", threadID);
+ }
+ producer.send(message);
+ if (useXA) {
+ xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
+ xasession.getXAResource().prepare(xid);
+ dupList.add(dupID);
+ xasession.getXAResource().commit(xid, false);
+
+ } else {
+ session.commit();
+ dupList.add(dupID);
+ }
+ logger.debug("sending OK dupID={}, threadID={}", dupID,
threadID);
+ dupID = String.valueOf(sequenceGenerator.incrementAndGet());
+ enoughSent.countDown();
+ } catch (Throwable e) {
+ logger.debug("error at thread {}, message={}", threadID,
e.getMessage(), e);
+ if (waitAfterError.get()) {
+ try {
+ // align once before XA recovery
+ logger.debug("thread {} waiting for errorCaptureFlag",
threadID);
+ errorCaptureFlag.await(30, TimeUnit.SECONDS);
+ // we align again to restart
+ logger.debug("thread {} waiting for startFlag on a
restart", threadID);
+ startFlag.await(30, TimeUnit.SECONDS);
+ } catch (Throwable e2) {
+ logger.warn("Exception of the exception on {},
message={}", threadID, e2.getMessage(), e2);
+ }
+ }
+ if (e.getCause() != null && e.getCause() instanceof
ActiveMQDuplicateIdException) {
+ logger.debug("duplicateID exception dupID={} error={},
threadID={}, storing the duplicateID as it is fine", dupID, e.getMessage(),
threadID);
+ dupList.add(dupID);
+ dupID = String.valueOf(sequenceGenerator.incrementAndGet());
+ } else {
+ logger.warn("error on dupID={}, error Message={}", dupID,
e.getMessage(), e);
+ factory = null;
+ connection = null;
+ xasession = null;
+ xaConnectionFactory = null;
+ session = null;
+ producer = null;
+ }
+ }
+ }
+
+ try {
+ connection.close();
+ } catch (Throwable ignored) {
+ }
+ } finally {
+ latchDone.countDown();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index 47d40e4889..60c17a960b 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.unit.core.paging.impl;
+import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -33,6 +34,7 @@ import
org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -60,8 +62,10 @@ import
org.apache.activemq.artemis.tests.util.ArtemisTestCase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -108,6 +112,9 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
Journal mockBindingsJournal;
Journal mockMessageJournal;
+ AtomicInteger numberOfCommitsMessageJournal = new AtomicInteger(0);
+ AtomicInteger numberOfPreparesMessageJournal = new AtomicInteger(0);
+
AtomicBoolean useReplication = new AtomicBoolean(false);
AtomicBoolean returnSynchronizing = new AtomicBoolean(false);
ReplicationManager mockReplicationManager;
@@ -158,6 +165,15 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
return msg;
}
+ private int commitCall() {
+ return numberOfCommitsMessageJournal.incrementAndGet();
+ }
+
+ private int prepareCall() {
+ return numberOfPreparesMessageJournal.incrementAndGet();
+ }
+
+
@BeforeEach
public void setupMocks() throws Exception {
configuration = new ConfigurationImpl();
@@ -174,6 +190,14 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
mockBindingsJournal = Mockito.mock(Journal.class);
mockMessageJournal = Mockito.mock(Journal.class);
+ Mockito.doAnswer(a ->
commitCall()).when(mockMessageJournal).appendCommitRecord(Mockito.anyLong(),
Mockito.anyBoolean());
+ Mockito.doAnswer(a ->
commitCall()).when(mockMessageJournal).appendCommitRecord(Mockito.anyLong(),
Mockito.anyBoolean(), Mockito.any());
+ Mockito.doAnswer(a ->
commitCall()).when(mockMessageJournal).appendCommitRecord(Mockito.anyLong(),
Mockito.anyBoolean(), Mockito.any(), Mockito.anyBoolean());
+
+ Mockito.doAnswer(a ->
prepareCall()).when(mockMessageJournal).appendPrepareRecord(Mockito.anyLong(),
(byte[]) Mockito.any(), Mockito.anyBoolean());
+ Mockito.doAnswer(a ->
prepareCall()).when(mockMessageJournal).appendPrepareRecord(Mockito.anyLong(),
Mockito.any(EncodingSupport.class), Mockito.anyBoolean(), Mockito.any());
+ Mockito.doAnswer(a ->
prepareCall()).when(mockMessageJournal).appendPrepareRecord(Mockito.anyLong(),
Mockito.any(EncodingSupport.class), Mockito.anyBoolean());
+
mockReplicationManager = Mockito.mock(ReplicationManager.class);
Mockito.when(mockReplicationManager.isStarted()).thenAnswer(a ->
useReplication.get());
Mockito.when(mockReplicationManager.isSynchronizing()).thenAnswer(a ->
returnSynchronizing.get());
@@ -323,7 +347,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
allowRunning.countDown();
- assertTrue(latch.await(1, TimeUnit.MINUTES));
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@@ -348,7 +372,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
allowRunning.countDown();
- assertTrue(latch.await(10, TimeUnit.MINUTES));
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@Test
@@ -373,7 +397,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
allowRunning.countDown();
- assertTrue(latch.await(10, TimeUnit.MINUTES));
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@Test
@@ -409,7 +433,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
allowSync.countDown();
- assertTrue(latch.await(10, TimeUnit.MINUTES));
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(numberOfMessages, pageWrites.get());
}
@@ -432,14 +456,48 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
timer.addTask(context, createPagedMessage(), transaction,
Mockito.mock(RouteContextList.class));
+ numberOfCommitsMessageJournal.set(0); // it should been 0 before anyway
but since I have no real reason to require it to be zero before, I'm doing this
just in case it ever changes
transaction.commit();
-
+ Assertions.assertEquals(0, numberOfCommitsMessageJournal.get());
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
useReplication.set(false);
allowRunning.countDown();
- assertTrue(latch.await(10, TimeUnit.MINUTES));
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Wait.assertEquals(1, numberOfCommitsMessageJournal::get, 5000, 100);
+ }
+
+
+ @Test
+ public void testTXCompletionPrepare() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ useReplication.set(true);
+
+ TransactionImpl transaction = new
TransactionImpl(Mockito.mock(Xid.class), realJournalStorageManager, -1);
+ transaction.setContainsPersistent();
+ transaction.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterPrepare(Transaction tx) {
+ super.afterCommit(tx);
+ latch.countDown();
+ }
+ });
+
+ timer.addTask(context, createPagedMessage(), transaction,
Mockito.mock(RouteContextList.class));
+
+ numberOfCommitsMessageJournal.set(0); // it should been 0 before anyway
but since I have no real reason to require it to be zero before, I'm doing this
just in case it ever changes
+ numberOfPreparesMessageJournal.set(0);
+ transaction.prepare();
+ Assertions.assertEquals(0, numberOfCommitsMessageJournal.get());
+ Assertions.assertEquals(0, numberOfPreparesMessageJournal.get());
+ assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
+ allowRunning.countDown();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Assertions.assertEquals(0, numberOfCommitsMessageJournal.get());
+ Wait.assertEquals(1, numberOfPreparesMessageJournal::get, 5000, 100);
}
+
// add a task while replicating, process it when no longer replicating
(disconnect a node scenario)
@Test
public void testDisableReplica() throws Exception {
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index d17ef7759f..0ca3c95dbc 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -296,6 +296,16 @@ public class BindingsImplTest extends ActiveMQTestBase {
public boolean hasTimedOut() {
return false;
}
+
+ @Override
+ public void delay() {
+
+ }
+
+ @Override
+ public void delayDone() {
+
+ }
}
private final class FakeFilter implements Filter {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact