Erik Håkansson created AMQ-8050:
-----------------------------------

             Summary: XAException when failing over in the middle of a 
transaction
                 Key: AMQ-8050
                 URL: https://issues.apache.org/jira/browse/AMQ-8050
             Project: ActiveMQ
          Issue Type: Bug
          Components: JMS client, Network of Brokers
    Affects Versions: 5.16.0
            Reporter: Erik Håkansson


We have been plagued in production by growing disk usage in KahaDB on our 
ActiveMQs. We have found that this is caused by hanging transactions, and the 
only solution so far has been to restart the broker. The hanging transactions 
happen when we have the occasional network glitch. The networking is out of our 
control, and not something we can fix.

However, we have found a workaround. Our clients are MDBs in Wildfly. If we 
disable failover for these, and instead let Wildfly handle creating new 
connections we don't see the issue.

I have been able to reproduce the error in a unit test. When there is a 
connection disturbance in the middle of a transaction (on the consumer end) and 
the client fails over to another broker in the network; it tries to commit the 
transaction on the new broker. 
 This fails with 
{noformat}
Transaction 'XID:[...]' has not been started. xaErrorCode:-4`{noformat}
and the transaction ends up in a weird state on the broker.

We are not using any replicated persistence adapters, just local kahaDB for 
each broker in the network.

I'm not sure if the error is actually in the client, that can't handle failover 
during a transaction, or in the broker that doesn't distribute the transaction 
properly to the other brokers in the network.

I'm also very open to the possibility that this is simply a configuration error 
on our end, but if so, I have no idea what.

I'm adding the unit test where I have reproduced it. I happily admit that I 
don't know much about how transactions actually behave in reality, so I might 
have misconfigured them here, but we see the exact same behaviour in production 
code where transactions are managed by Wildfly.
{code:java}
package test;

import static org.awaitility.Awaitility.await;

import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

public class FailoverErrorTest {

  private static final Logger logger = 
LoggerFactory.getLogger(ClusterTest.class);

  private static BrokerService createBroker(boolean deleteAllMessagesOnStartup, 
String bindAddress) throws Exception {
    BrokerService broker = new BrokerService();
    broker.setUseJmx(true);
    broker.setAdvisorySupport(true);
    TransportConnector transportConnector = new TransportConnector();
    transportConnector.setName("openwire");
    transportConnector.setUri(URI.create(bindAddress));
    transportConnector.setRebalanceClusterClients(true);
    transportConnector.setUpdateClusterClientsOnRemove(true);
    broker.addConnector(transportConnector);
    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);

    PolicyMap policyMap = new PolicyMap();
    PolicyEntry defaultEntry = new PolicyEntry();
    policyMap.setDefaultEntry(defaultEntry);
    broker.setDestinationPolicy(policyMap);

    return broker;
  }

  private static Message getMessage(String messageId, ActiveMQMessageConsumer 
consumer) throws JMSException {
    String receivedMessageId = null;
    Instant start = Instant.now();
    while (receivedMessageId == null || !Objects.equals(messageId, 
receivedMessageId)) {
      if (Instant.now().isAfter(start.plus(5, ChronoUnit.SECONDS))) {
        Assert.fail("timeout");
      }
      Message msg = consumer.receive(20000);
      Assert.assertNotNull("Couldn't get message", msg);
      receivedMessageId = msg.getStringProperty("my_id");
      if (!Objects.equals(messageId, receivedMessageId)) {
        logger.info("Got the wrong message. Looping.");
      } else {
        logger.info("Found message");
        return msg;
      }
    }
    return null;
  }

  private static Xid createXid() throws IOException {
    final AtomicLong txGenerator = new AtomicLong(System.currentTimeMillis());

    java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
    DataOutputStream os = new DataOutputStream(baos);
    os.writeLong(txGenerator.incrementAndGet());
    os.close();
    final byte[] bs = baos.toByteArray();

    return new Xid() {
      @Override
      public int getFormatId() {
        return 86;
      }

      @Override
      public byte[] getGlobalTransactionId() {
        return bs;
      }

      @Override
      public byte[] getBranchQualifier() {
        return bs;
      }
    };
  }

  @Test
  public void failoverWithExceptionProgrammaticBrokers() throws Exception {
    BrokerService broker1 = createBroker(true, "tcp://localhost:11001");
    broker1.setBrokerName("broker1");
    BrokerService broker2 = createBroker(true, "tcp://localhost:11002");
    broker2.setBrokerName("broker2");

    XAConnection producerConnection = null;
    ActiveMQXAConnection consumerConnection = null;
    try {
      System.setProperty("org.slf4j.simpleLogger.log." + 
FailoverTransport.class.getName(), "DEBUG");

      broker1.start();
      broker2.start();
      await().atMost(Duration.ofSeconds(10)).until(() -> broker1.isStarted() && 
broker2.isStarted());

      broker1.addNetworkConnector("static:(tcp://localhost:11002)");
      broker2.addNetworkConnector("static:(tcp://localhost:11001)");
      broker1.getNetworkConnectors().get(0).start();
      broker2.getNetworkConnectors().get(0).start();

      await().atMost(Duration.ofSeconds(30))
          .until(() -> broker1.getNetworkConnectors().get(0).isStarted() &&
              broker2.getNetworkConnectors().get(0).isStarted());

      String queueName = "MY_QUEUE";

      String url = "failover:(tcp://localhost:11001,tcp://localhost:11002)";
      ActiveMQXAConnectionFactory firstFactory = new 
ActiveMQXAConnectionFactory(url);
      producerConnection = firstFactory.createXAConnection();
      producerConnection.setClientID("PRODUCER");
      producerConnection.start();
      XASession producerSession = producerConnection.createXASession();
      Queue producerDestination = producerSession.createQueue(queueName);
      Xid xid = createXid();
      producerSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
      String messageId = UUID.randomUUID().toString();
      MessageProducer producer = 
producerSession.createProducer(producerDestination);
      TextMessage sendMessage = producerSession.createTextMessage("Test 
message");
      sendMessage.setStringProperty("my_id", messageId);
      producer.send(sendMessage);
      producerSession.getXAResource().end(xid, XAResource.TMSUCCESS);
      producerSession.getXAResource().prepare(xid);
      producerSession.getXAResource().commit(xid, false);

      consumerConnection = (ActiveMQXAConnection) 
firstFactory.createXAConnection();
      consumerConnection.setClientID("CONSUMER");
      consumerConnection.start();
      XASession consumerSession = consumerConnection.createXASession();
      Queue consumerDestination = consumerSession.createQueue(queueName);
      ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
consumerSession.createConsumer(consumerDestination);

      Xid consumerXid = createXid();
      consumerSession.getXAResource().start(consumerXid, XAResource.TMNOFLAGS);
      Message message = getMessage(messageId, consumer);
      consumerSession.getXAResource().end(consumerXid, XAResource.TMSUCCESS);
      consumerSession.getXAResource().prepare(consumerXid);

      logger.info("Simulating dropped connection");
      FailoverTransport transport = 
consumerConnection.getTransport().narrow(FailoverTransport.class);
      URI currentTransport = transport.getConnectedTransportURI();
      transport.handleTransportFailure(new IOException("Fake fail"));
      await().atMost(Duration.ofSeconds(10)).until(() -> 
!Objects.equals(currentTransport, transport.getConnectedTransportURI()) && 
transport.isConnected());
      Assert.assertTrue(transport.isConnected());
      Assert.assertNotEquals(currentTransport, 
transport.getConnectedTransportURI());
      message.acknowledge();
      consumerSession.getXAResource().commit(consumerXid, false);
    } catch (XAException e) {
      if (e.errorCode == -4) {
        logger.info("Recreated error successfully");
      } else {
        logger.error("Got XAException " + e.errorCode, e);
        Assert.fail();
      }
    } finally {
      producerConnection.close();
      consumerConnection.close();
      broker1.getNetworkConnectors().get(0).stop();
      broker2.getNetworkConnectors().get(0).stop();
      await().atMost(Duration.ofSeconds(5)).until(() -> 
broker1.getNetworkConnectors().get(0).isStopped() && 
broker2.getNetworkConnectors().get(0).isStopped());
      broker1.stop();
      broker2.stop();
    }
  }
}
{code}
pom.xml:
{code:xml}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  <modelVersion>4.0.0</modelVersion>


  <groupId>test</groupId>
  <artifactId>cluster-test</artifactId>
  <version>1</version>

  <properties>
    <!--    <version.activemq>5.11.0.redhat-630475</version.activemq>-->
    <version.activemq>5.16.0</version.activemq>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
  </properties>


  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-broker</artifactId>
      <version>${version.activemq}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-kahadb-store</artifactId>
      <version>${version.activemq}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.30</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.awaitility</groupId>
      <artifactId>awaitility</artifactId>
      <version>4.0.3</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

</project>


{code}
Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to