This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 91cbbb8  ARTEMIS-2559 Connection failure should rollback pending XA TX
     new 8969045  This closes #2899
91cbbb8 is described below

commit 91cbbb86981ee04507a7d3abbda2aad9e5f23f9d
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 21 15:13:38 2019 -0500

    ARTEMIS-2559 Connection failure should rollback pending XA TX
---
 .../core/server/impl/ServerSessionImpl.java        |  35 +++-
 .../artemis/core/transaction/Transaction.java      |   5 +
 .../core/transaction/impl/TransactionImpl.java     |  21 +++
 .../tests/integration/xa/SessionFailureXATest.java | 200 +++++++++++++++++++++
 .../core/postoffice/impl/BindingsImplTest.java     |   5 +
 5 files changed, 260 insertions(+), 6 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 08f2d4e..7b385ce 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -139,6 +139,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    protected Transaction tx;
 
+   /** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
+    *  in a failure scenario (client is gone), this will be held between xaEnd 
and xaCommit. */
+   protected volatile Transaction pendingTX;
+
    protected boolean xa;
 
    protected final PagingManager pagingManager;
@@ -384,13 +388,28 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          if (closed)
             return;
 
-         if (tx != null && tx.getXid() == null) {
-            // We only rollback local txs on close, not XA tx branches
+         if (failed) {
 
-            try {
-               rollback(failed, false);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
+            Transaction txToRollback = tx;
+            if (txToRollback != null) {
+               txToRollback.rollbackIfPossible();
+            }
+
+            txToRollback = pendingTX;
+
+            if (txToRollback != null) {
+               txToRollback.rollbackIfPossible();
+            }
+
+         } else {
+            if (tx != null && tx.getXid() == null) {
+               // We only rollback local txs on close, not XA tx branches
+
+               try {
+                  rollback(failed, false);
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
+               }
             }
          }
       }
@@ -1252,6 +1271,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public synchronized void xaCommit(final Xid xid, final boolean onePhase) 
throws Exception {
+      this.pendingTX = null;
 
       if (tx != null && tx.getXid().equals(xid)) {
          final String msg = "Cannot commit, session is currently doing work in 
transaction " + tx.getXid();
@@ -1310,6 +1330,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
                throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
             }
          } else {
+            this.pendingTX = tx;
             tx = null;
          }
       } else {
@@ -1395,6 +1416,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public synchronized void xaRollback(final Xid xid) throws Exception {
+      this.pendingTX = null;
+
       if (tx != null && tx.getXid().equals(xid)) {
          final String msg = "Cannot roll back, session is currently doing work 
in a transaction " + tx.getXid();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index 6fa2c5f..6a6be0a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -51,6 +51,11 @@ public interface Transaction {
 
    void rollback() throws Exception;
 
+   /** In a ServerSession failure scenario,\
+    *  we may try to rollback, however only if it's not prepared.
+    *  In case it's prepared, we will just let it be and let the transaction 
manager to deal with it */
+   void rollbackIfPossible();
+
    long getID();
 
    Xid getXid();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 95983b7..8e22bbb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -358,6 +358,27 @@ public class TransactionImpl implements Transaction {
    }
 
    @Override
+   public void rollbackIfPossible() {
+      synchronized (timeoutLock) {
+         if (state == State.ROLLEDBACK) {
+            // I don't think this could happen, but just in case
+            logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is 
being ignored");
+            return;
+         }
+         if (state != State.PREPARED) {
+            try {
+               internalRollback(sorted);
+            } catch (Exception e) {
+               // nothing we can do beyond logging
+               // no need to special handler here as this was not even 
supposed to happen at this point
+               // even if it happenes this would be the exception of the 
exception, so we just log here
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      }
+   }
+
+   @Override
    public void rollback() throws Exception {
       if (logger.isTraceEnabled()) {
          logger.trace("TransactionImpl::rollback::" + this);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.java
new file mode 100644
index 0000000..73ecc47
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.xa;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SessionFailureXATest extends ActiveMQTestBase {
+
+   private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+   private final Map<String, AddressSettings> addressSettings = new 
HashMap<>();
+
+   private ActiveMQServer messagingService;
+
+   private ClientSession clientSession;
+
+   private ClientSessionFactory sessionFactory;
+
+   private Configuration configuration;
+
+   private final SimpleString atestq = new SimpleString("BasicXaTestq");
+
+   private ServerLocator locator;
+
+   private StoreConfiguration.StoreType storeType;
+
+   public SessionFailureXATest(StoreConfiguration.StoreType storeType) {
+      this.storeType = storeType;
+   }
+
+   @Parameterized.Parameters(name = "storeType={0}")
+   public static Collection<Object[]> data() {
+      Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}};
+      return Arrays.asList(params);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      addressSettings.clear();
+
+      if (storeType == StoreConfiguration.StoreType.DATABASE) {
+         configuration = createDefaultJDBCConfig(true);
+      } else {
+         configuration = createDefaultNettyConfig();
+      }
+
+      messagingService = createServer(true, configuration, -1, -1, 
addressSettings);
+
+      // start the server
+      messagingService.start();
+
+      locator = createInVMNonHALocator();
+      locator.setAckBatchSize(0);
+      sessionFactory = createSessionFactory(locator);
+
+      clientSession = addClientSession(sessionFactory.createSession(true, 
false, false));
+
+      clientSession.createQueue(atestq, atestq, null, true);
+   }
+
+   @Test
+   public void testFailureWithXAEnd() throws Exception {
+      testFailure(true);
+   }
+
+   @Test
+   public void testFailureWithoutXAEnd() throws Exception {
+      testFailure(false);
+   }
+
+   public void testFailure(boolean xaEnd) throws Exception {
+
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, 
true);
+      try {
+         ClientProducer clientProducer = clientSession2.createProducer(atestq);
+         ClientMessage m1 = createTextMessage(clientSession2, "m1");
+         ClientMessage m2 = createTextMessage(clientSession2, "m2");
+         ClientMessage m3 = createTextMessage(clientSession2, "m3");
+         ClientMessage m4 = createTextMessage(clientSession2, "m4");
+         clientProducer.send(m1);
+         clientProducer.send(m2);
+         clientProducer.send(m3);
+         clientProducer.send(m4);
+      } finally {
+         clientSession2.close();
+      }
+
+      Xid xid = newXID();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+      ClientMessage m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+      if (xaEnd) {
+         // We are validating both cases, where xaEnd succeeded and didn't 
succeed
+         // so this tests is parameterized to validate both cases.
+         clientSession.end(xid, XAResource.TMSUCCESS);
+      }
+
+      Wait.assertEquals(1, () -> messagingService.getSessions().size());
+
+      for (ServerSession serverSession : messagingService.getSessions()) {
+         serverSession.getRemotingConnection().fail(new 
ActiveMQException("fail this"));
+         serverSession.getRemotingConnection().disconnect(false);
+      }
+
+      Wait.assertEquals(0, () -> messagingService.getSessions().size());
+
+      locator = createInVMNonHALocator();
+      sessionFactory = createSessionFactory(locator);
+      clientSession = addClientSession(sessionFactory.createSession(true, 
false, false));
+
+      Wait.assertEquals(1, () -> messagingService.getSessions().size());
+
+      xid = newXID();
+
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
+      clientSession.start();
+      clientConsumer = clientSession.createConsumer(atestq);
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+
+   }
+}
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 591f54a..e2766a3 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -140,6 +140,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void rollbackIfPossible() {
+
+      }
+
+      @Override
       public void commit(final boolean onePhase) throws Exception {
 
       }

Reply via email to