This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 91cbbb8 ARTEMIS-2559 Connection failure should rollback pending XA TX
new 8969045 This closes #2899
91cbbb8 is described below
commit 91cbbb86981ee04507a7d3abbda2aad9e5f23f9d
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 21 15:13:38 2019 -0500
ARTEMIS-2559 Connection failure should rollback pending XA TX
---
.../core/server/impl/ServerSessionImpl.java | 35 +++-
.../artemis/core/transaction/Transaction.java | 5 +
.../core/transaction/impl/TransactionImpl.java | 21 +++
.../tests/integration/xa/SessionFailureXATest.java | 200 +++++++++++++++++++++
.../core/postoffice/impl/BindingsImplTest.java | 5 +
5 files changed, 260 insertions(+), 6 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 08f2d4e..7b385ce 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -139,6 +139,10 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
protected Transaction tx;
+ /** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
+ * in a failure scenario (client is gone), this will be held between xaEnd
and xaCommit. */
+ protected volatile Transaction pendingTX;
+
protected boolean xa;
protected final PagingManager pagingManager;
@@ -384,13 +388,28 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
if (closed)
return;
- if (tx != null && tx.getXid() == null) {
- // We only rollback local txs on close, not XA tx branches
+ if (failed) {
- try {
- rollback(failed, false);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
+ Transaction txToRollback = tx;
+ if (txToRollback != null) {
+ txToRollback.rollbackIfPossible();
+ }
+
+ txToRollback = pendingTX;
+
+ if (txToRollback != null) {
+ txToRollback.rollbackIfPossible();
+ }
+
+ } else {
+ if (tx != null && tx.getXid() == null) {
+ // We only rollback local txs on close, not XA tx branches
+
+ try {
+ rollback(failed, false);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
+ }
}
}
}
@@ -1252,6 +1271,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
@Override
public synchronized void xaCommit(final Xid xid, final boolean onePhase)
throws Exception {
+ this.pendingTX = null;
if (tx != null && tx.getXid().equals(xid)) {
final String msg = "Cannot commit, session is currently doing work in
transaction " + tx.getXid();
@@ -1310,6 +1330,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
}
} else {
+ this.pendingTX = tx;
tx = null;
}
} else {
@@ -1395,6 +1416,8 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
@Override
public synchronized void xaRollback(final Xid xid) throws Exception {
+ this.pendingTX = null;
+
if (tx != null && tx.getXid().equals(xid)) {
final String msg = "Cannot roll back, session is currently doing work
in a transaction " + tx.getXid();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index 6fa2c5f..6a6be0a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -51,6 +51,11 @@ public interface Transaction {
void rollback() throws Exception;
+ /** In a ServerSession failure scenario,\
+ * we may try to rollback, however only if it's not prepared.
+ * In case it's prepared, we will just let it be and let the transaction
manager to deal with it */
+ void rollbackIfPossible();
+
long getID();
Xid getXid();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 95983b7..8e22bbb 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -358,6 +358,27 @@ public class TransactionImpl implements Transaction {
}
@Override
+ public void rollbackIfPossible() {
+ synchronized (timeoutLock) {
+ if (state == State.ROLLEDBACK) {
+ // I don't think this could happen, but just in case
+ logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is
being ignored");
+ return;
+ }
+ if (state != State.PREPARED) {
+ try {
+ internalRollback(sorted);
+ } catch (Exception e) {
+ // nothing we can do beyond logging
+ // no need to special handler here as this was not even
supposed to happen at this point
+ // even if it happenes this would be the exception of the
exception, so we just log here
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ @Override
public void rollback() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("TransactionImpl::rollback::" + this);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.java
new file mode 100644
index 0000000..73ecc47
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.xa;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SessionFailureXATest extends ActiveMQTestBase {
+
+ private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+ private final Map<String, AddressSettings> addressSettings = new
HashMap<>();
+
+ private ActiveMQServer messagingService;
+
+ private ClientSession clientSession;
+
+ private ClientSessionFactory sessionFactory;
+
+ private Configuration configuration;
+
+ private final SimpleString atestq = new SimpleString("BasicXaTestq");
+
+ private ServerLocator locator;
+
+ private StoreConfiguration.StoreType storeType;
+
+ public SessionFailureXATest(StoreConfiguration.StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ @Parameterized.Parameters(name = "storeType={0}")
+ public static Collection<Object[]> data() {
+ Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}};
+ return Arrays.asList(params);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ addressSettings.clear();
+
+ if (storeType == StoreConfiguration.StoreType.DATABASE) {
+ configuration = createDefaultJDBCConfig(true);
+ } else {
+ configuration = createDefaultNettyConfig();
+ }
+
+ messagingService = createServer(true, configuration, -1, -1,
addressSettings);
+
+ // start the server
+ messagingService.start();
+
+ locator = createInVMNonHALocator();
+ locator.setAckBatchSize(0);
+ sessionFactory = createSessionFactory(locator);
+
+ clientSession = addClientSession(sessionFactory.createSession(true,
false, false));
+
+ clientSession.createQueue(atestq, atestq, null, true);
+ }
+
+ @Test
+ public void testFailureWithXAEnd() throws Exception {
+ testFailure(true);
+ }
+
+ @Test
+ public void testFailureWithoutXAEnd() throws Exception {
+ testFailure(false);
+ }
+
+ public void testFailure(boolean xaEnd) throws Exception {
+
+ ClientSession clientSession2 = sessionFactory.createSession(false, true,
true);
+ try {
+ ClientProducer clientProducer = clientSession2.createProducer(atestq);
+ ClientMessage m1 = createTextMessage(clientSession2, "m1");
+ ClientMessage m2 = createTextMessage(clientSession2, "m2");
+ ClientMessage m3 = createTextMessage(clientSession2, "m3");
+ ClientMessage m4 = createTextMessage(clientSession2, "m4");
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ } finally {
+ clientSession2.close();
+ }
+
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
+ clientSession.start();
+ ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+ ClientMessage m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+ if (xaEnd) {
+ // We are validating both cases, where xaEnd succeeded and didn't
succeed
+ // so this tests is parameterized to validate both cases.
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ }
+
+ Wait.assertEquals(1, () -> messagingService.getSessions().size());
+
+ for (ServerSession serverSession : messagingService.getSessions()) {
+ serverSession.getRemotingConnection().fail(new
ActiveMQException("fail this"));
+ serverSession.getRemotingConnection().disconnect(false);
+ }
+
+ Wait.assertEquals(0, () -> messagingService.getSessions().size());
+
+ locator = createInVMNonHALocator();
+ sessionFactory = createSessionFactory(locator);
+ clientSession = addClientSession(sessionFactory.createSession(true,
false, false));
+
+ Wait.assertEquals(1, () -> messagingService.getSessions().size());
+
+ xid = newXID();
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
+ clientSession.start();
+ clientConsumer = clientSession.createConsumer(atestq);
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+
+ }
+}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 591f54a..e2766a3 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -140,6 +140,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
@Override
+ public void rollbackIfPossible() {
+
+ }
+
+ @Override
public void commit(final boolean onePhase) throws Exception {
}