[
https://issues.apache.org/jira/browse/AMQ-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237413#comment-17237413
]
Erik Håkansson commented on AMQ-8050:
-------------------------------------
I haven't tried in 5.15.x specifically, but it's the same behaviour in RedHats
5.11.0.redhat-630475. However, the RedHat patch builds include a lot of
backports so that's not the same as 5.11.x so I don't know what Apache version
it would match against.
I only tried the reproduction test with Apache 5.16.0 and RedHat
5.11.0.redhat-630475.
> XAException when failing over in the middle of a transaction
> ------------------------------------------------------------
>
> Key: AMQ-8050
> URL: https://issues.apache.org/jira/browse/AMQ-8050
> Project: ActiveMQ
> Issue Type: Bug
> Components: JMS client, Network of Brokers
> Affects Versions: 5.16.0
> Reporter: Erik Håkansson
> Assignee: Jean-Baptiste Onofré
> Priority: Major
>
> We have been plagued in production by growing disk usage in KahaDB on our
> ActiveMQs. We have found that this is caused by hanging transactions, and the
> only solution so far has been to restart the broker. The hanging transactions
> happen when we have the occasional network glitch. The networking is out of
> our control, and not something we can fix.
> However, we have found a workaround. Our clients are MDBs in Wildfly. If we
> disable failover for these, and instead let Wildfly handle creating new
> connections we don't see the issue.
> I have been able to reproduce the error in a unit test. When there is a
> connection disturbance in the middle of a transaction (on the consumer end)
> and the client fails over to another broker in the network; it tries to
> commit the transaction on the new broker.
> This fails with
> {noformat}
> Transaction 'XID:[...]' has not been started. xaErrorCode:-4`{noformat}
> and the transaction ends up in a weird state on the broker.
> We are not using any replicated persistence adapters, just local kahaDB for
> each broker in the network.
> I'm not sure if the error is actually in the client, that can't handle
> failover during a transaction, or in the broker that doesn't distribute the
> transaction properly to the other brokers in the network.
> I'm also very open to the possibility that this is simply a configuration
> error on our end, but if so, I have no idea what.
> I'm adding the unit test where I have reproduced it. I happily admit that I
> don't know much about how transactions actually behave in reality, so I might
> have misconfigured them here, but we see the exact same behaviour in
> production code where transactions are managed by Wildfly.
> {code:java}
> package test;
> import static org.awaitility.Awaitility.await;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.ActiveMQXAConnection;
> import org.apache.activemq.ActiveMQXAConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.broker.TransportConnector;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.transport.failover.FailoverTransport;
> import org.junit.Assert;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.DataOutputStream;
> import java.io.IOException;
> import java.net.URI;
> import java.time.Duration;
> import java.time.Instant;
> import java.time.temporal.ChronoUnit;
> import java.util.Objects;
> import java.util.UUID;
> import java.util.concurrent.atomic.AtomicLong;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.TextMessage;
> import javax.jms.XAConnection;
> import javax.jms.XASession;
> import javax.transaction.xa.XAException;
> import javax.transaction.xa.XAResource;
> import javax.transaction.xa.Xid;
> public class FailoverErrorTest {
> private static final Logger logger =
> LoggerFactory.getLogger(ClusterTest.class);
> private static BrokerService createBroker(boolean
> deleteAllMessagesOnStartup, String bindAddress) throws Exception {
> BrokerService broker = new BrokerService();
> broker.setUseJmx(true);
> broker.setAdvisorySupport(true);
> TransportConnector transportConnector = new TransportConnector();
> transportConnector.setName("openwire");
> transportConnector.setUri(URI.create(bindAddress));
> transportConnector.setRebalanceClusterClients(true);
> transportConnector.setUpdateClusterClientsOnRemove(true);
> broker.addConnector(transportConnector);
> broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
> PolicyMap policyMap = new PolicyMap();
> PolicyEntry defaultEntry = new PolicyEntry();
> policyMap.setDefaultEntry(defaultEntry);
> broker.setDestinationPolicy(policyMap);
> return broker;
> }
> private static Message getMessage(String messageId, ActiveMQMessageConsumer
> consumer) throws JMSException {
> String receivedMessageId = null;
> Instant start = Instant.now();
> while (receivedMessageId == null || !Objects.equals(messageId,
> receivedMessageId)) {
> if (Instant.now().isAfter(start.plus(5, ChronoUnit.SECONDS))) {
> Assert.fail("timeout");
> }
> Message msg = consumer.receive(20000);
> Assert.assertNotNull("Couldn't get message", msg);
> receivedMessageId = msg.getStringProperty("my_id");
> if (!Objects.equals(messageId, receivedMessageId)) {
> logger.info("Got the wrong message. Looping.");
> } else {
> logger.info("Found message");
> return msg;
> }
> }
> return null;
> }
> private static Xid createXid() throws IOException {
> final AtomicLong txGenerator = new AtomicLong(System.currentTimeMillis());
> java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
> DataOutputStream os = new DataOutputStream(baos);
> os.writeLong(txGenerator.incrementAndGet());
> os.close();
> final byte[] bs = baos.toByteArray();
> return new Xid() {
> @Override
> public int getFormatId() {
> return 86;
> }
> @Override
> public byte[] getGlobalTransactionId() {
> return bs;
> }
> @Override
> public byte[] getBranchQualifier() {
> return bs;
> }
> };
> }
> @Test
> public void failoverWithExceptionProgrammaticBrokers() throws Exception {
> BrokerService broker1 = createBroker(true, "tcp://localhost:11001");
> broker1.setBrokerName("broker1");
> BrokerService broker2 = createBroker(true, "tcp://localhost:11002");
> broker2.setBrokerName("broker2");
> XAConnection producerConnection = null;
> ActiveMQXAConnection consumerConnection = null;
> try {
> System.setProperty("org.slf4j.simpleLogger.log." +
> FailoverTransport.class.getName(), "DEBUG");
> broker1.start();
> broker2.start();
> await().atMost(Duration.ofSeconds(10)).until(() -> broker1.isStarted()
> && broker2.isStarted());
> broker1.addNetworkConnector("static:(tcp://localhost:11002)");
> broker2.addNetworkConnector("static:(tcp://localhost:11001)");
> broker1.getNetworkConnectors().get(0).start();
> broker2.getNetworkConnectors().get(0).start();
> await().atMost(Duration.ofSeconds(30))
> .until(() -> broker1.getNetworkConnectors().get(0).isStarted() &&
> broker2.getNetworkConnectors().get(0).isStarted());
> String queueName = "MY_QUEUE";
> String url = "failover:(tcp://localhost:11001,tcp://localhost:11002)";
> ActiveMQXAConnectionFactory firstFactory = new
> ActiveMQXAConnectionFactory(url);
> producerConnection = firstFactory.createXAConnection();
> producerConnection.setClientID("PRODUCER");
> producerConnection.start();
> XASession producerSession = producerConnection.createXASession();
> Queue producerDestination = producerSession.createQueue(queueName);
> Xid xid = createXid();
> producerSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
> String messageId = UUID.randomUUID().toString();
> MessageProducer producer =
> producerSession.createProducer(producerDestination);
> TextMessage sendMessage = producerSession.createTextMessage("Test
> message");
> sendMessage.setStringProperty("my_id", messageId);
> producer.send(sendMessage);
> producerSession.getXAResource().end(xid, XAResource.TMSUCCESS);
> producerSession.getXAResource().prepare(xid);
> producerSession.getXAResource().commit(xid, false);
> consumerConnection = (ActiveMQXAConnection)
> firstFactory.createXAConnection();
> consumerConnection.setClientID("CONSUMER");
> consumerConnection.start();
> XASession consumerSession = consumerConnection.createXASession();
> Queue consumerDestination = consumerSession.createQueue(queueName);
> ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
> consumerSession.createConsumer(consumerDestination);
> Xid consumerXid = createXid();
> consumerSession.getXAResource().start(consumerXid,
> XAResource.TMNOFLAGS);
> Message message = getMessage(messageId, consumer);
> consumerSession.getXAResource().end(consumerXid, XAResource.TMSUCCESS);
> consumerSession.getXAResource().prepare(consumerXid);
> logger.info("Simulating dropped connection");
> FailoverTransport transport =
> consumerConnection.getTransport().narrow(FailoverTransport.class);
> URI currentTransport = transport.getConnectedTransportURI();
> transport.handleTransportFailure(new IOException("Fake fail"));
> await().atMost(Duration.ofSeconds(10)).until(() ->
> !Objects.equals(currentTransport, transport.getConnectedTransportURI()) &&
> transport.isConnected());
> Assert.assertTrue(transport.isConnected());
> Assert.assertNotEquals(currentTransport,
> transport.getConnectedTransportURI());
> message.acknowledge();
> consumerSession.getXAResource().commit(consumerXid, false);
> } catch (XAException e) {
> if (e.errorCode == -4) {
> logger.info("Recreated error successfully");
> } else {
> logger.error("Got XAException " + e.errorCode, e);
> Assert.fail();
> }
> } finally {
> producerConnection.close();
> consumerConnection.close();
> broker1.getNetworkConnectors().get(0).stop();
> broker2.getNetworkConnectors().get(0).stop();
> await().atMost(Duration.ofSeconds(5)).until(() ->
> broker1.getNetworkConnectors().get(0).isStopped() &&
> broker2.getNetworkConnectors().get(0).isStopped());
> broker1.stop();
> broker2.stop();
> }
> }
> }
> {code}
> pom.xml:
> {code:xml}
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
> <groupId>test</groupId>
> <artifactId>cluster-test</artifactId>
> <version>1</version>
> <properties>
> <!-- <version.activemq>5.11.0.redhat-630475</version.activemq>-->
> <version.activemq>5.16.0</version.activemq>
> <maven.compiler.target>1.8</maven.compiler.target>
> <maven.compiler.source>1.8</maven.compiler.source>
> </properties>
> <dependencies>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>4.13</version>
> </dependency>
> <dependency>
> <groupId>org.apache.activemq</groupId>
> <artifactId>activemq-broker</artifactId>
> <version>${version.activemq}</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.activemq</groupId>
> <artifactId>activemq-kahadb-store</artifactId>
> <version>${version.activemq}</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-simple</artifactId>
> <version>1.7.30</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.awaitility</groupId>
> <artifactId>awaitility</artifactId>
> <version>4.0.3</version>
> <scope>test</scope>
> </dependency>
> </dependencies>
> </project>
> {code}
> Thanks
--
This message was sent by Atlassian Jira
(v8.3.4#803005)