ARTEMIS-302 more changes around XA reliability (resilience on failures)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/af1f79bf Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/af1f79bf Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/af1f79bf Branch: refs/heads/master Commit: af1f79bff503ee02ac119efceac65928f671fd1e Parents: a2c8e6b Author: Clebert Suconic <[email protected]> Authored: Mon Dec 14 20:11:53 2015 -0500 Committer: Clebert Suconic <[email protected]> Committed: Wed Dec 16 10:19:35 2015 -0500 ---------------------------------------------------------------------- .../core/client/impl/ClientSessionImpl.java | 14 +- .../activemq/artemis/ra/ActiveMQRALogger.java | 10 +- .../artemis/ra/ActiveMQRAManagedConnection.java | 2 +- .../artemis/ra/ActiveMQRAXAResource.java | 21 +- .../artemis/ra/ActiveMQResourceAdapter.java | 188 +++--- .../artemis/ra/inflow/ActiveMQActivation.java | 76 +-- .../ra/inflow/ActiveMQMessageHandler.java | 3 +- .../impl/journal/JournalStorageManager.java | 3 +- .../core/server/cluster/impl/BridgeImpl.java | 23 + .../core/server/impl/ServerConsumerImpl.java | 3 +- .../core/server/impl/ServerSessionImpl.java | 9 +- .../core/transaction/impl/TransactionImpl.java | 148 ++-- .../transaction/impl/TransactionImplTest.java | 673 +++++++++++++++++++ .../byteman/ClusteredBridgeReconnectTest.java | 228 +++++++ .../byteman/ConcurrentDeliveryCancelTest.java | 92 +-- .../tests/extras/byteman/TimeoutXATest.java | 12 +- ...MDBMultipleHandlersServerDisconnectTest.java | 116 +++- .../cluster/bridge/BridgeReconnectTest.java | 4 + .../failover/AsynchronousFailoverTest.java | 8 +- .../integration/ra/ResourceAdapterTest.java | 48 +- .../integration/xa/BasicXaRecoveryTest.java | 3 + .../tests/unit/ra/ResourceAdapterTest.java | 24 +- 22 files changed, 1371 insertions(+), 337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index dc89680..221413c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -539,8 +539,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi rollback(false); } - @Override - public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException { + public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException + { + rollback(isLastMessageAsDelivered, true); + } + + public void rollback(final boolean isLastMessageAsDelivered, final boolean waitConsumers) throws ActiveMQException + { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")"); } @@ -559,7 +564,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // We need to make sure we don't get any inflight messages for (ClientConsumerInternal consumer : cloneConsumers()) { - consumer.clear(true); + consumer.clear(waitConsumers); } // Acks must be flushed here *after connection is stopped and all onmessages finished executing @@ -1173,7 +1178,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { if (rollbackOnly) { try { - rollback(); + rollback(false, false); } catch (Throwable ignored) { ActiveMQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored); @@ -1252,6 +1257,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return sessionContext.configureTransactionTimeout(seconds); } catch (Throwable t) { + markRollbackOnly(); // The TM will ignore any errors from here, if things are this screwed up we mark rollbackonly // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java index 6ec3a5e..7853f09 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java @@ -74,20 +74,20 @@ public interface ActiveMQRALogger extends BasicLogger { void awaitingJMSServerCreation(); @LogMessage(level = Logger.Level.INFO) - @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT) - void rebalancingConnections(); + @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections on even {0}.", format = Message.Format.MESSAGE_FORMAT) + void rebalancingConnections(String event); @LogMessage(level = Logger.Level.WARN) @Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT) - void problemResettingXASession(); + void problemResettingXASession(@Cause Throwable t); @LogMessage(level = Logger.Level.WARN) @Message(id = 152002, value = "Unable to roll local transaction back", format = Message.Format.MESSAGE_FORMAT) void unableToRollbackTX(); @LogMessage(level = Logger.Level.WARN) - @Message(id = 152003, value = "unable to reset session after failure", format = Message.Format.MESSAGE_FORMAT) - void unableToResetSession(); + @Message(id = 152003, value = "unable to reset session after failure, we will place the MDB Inflow now in setup mode for activation={0}" , format = Message.Format.MESSAGE_FORMAT) + void unableToResetSession(String spec, @Cause Exception e); @LogMessage(level = Logger.Level.WARN) @Message(id = 152004, value = "Handling JMS exception failure", format = Message.Format.MESSAGE_FORMAT) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java index 3cd1515..97c6032 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java @@ -811,7 +811,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc private void createCF() { if (connectionFactory == null) { - connectionFactory = ra.createActiveMQConnectionFactory(mcf.getProperties()); + connectionFactory = ra.getConnectionFactory(mcf.getProperties()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java index 093ee0f..9d485f3 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java @@ -21,8 +21,8 @@ import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ActiveMQXAResource; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; /** * ActiveMQXAResource. @@ -76,13 +76,18 @@ public class ActiveMQRAXAResource implements ActiveMQXAResource { ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource; try { - //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this - sessionInternal.resetIfNeeded(); - } - catch (ActiveMQException e) { - ActiveMQRALogger.LOGGER.problemResettingXASession(); - } - try { + try { + //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this + sessionInternal.resetIfNeeded(); + } + catch (ActiveMQException e) { + ActiveMQRALogger.LOGGER.problemResettingXASession(e); + + XAException xaException = new XAException(XAException.XAER_RMFAIL); + xaException.initCause(e); + throw xaException; + } + xaResource.start(xid, flags); } finally { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index f9781c1..8c1a9b4 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -29,11 +29,12 @@ import javax.transaction.xa.XAResource; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Hashtable; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; @@ -141,7 +142,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { raProperties = new ActiveMQRAProperties(); configured = new AtomicBoolean(false); - activations = new ConcurrentHashMap<>(); + activations = Collections.synchronizedMap(new IdentityHashMap<ActivationSpec, ActiveMQActivation>()); recoveryManager = new RecoveryManager(); } @@ -1570,7 +1571,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { */ protected void setup() throws ActiveMQException { raProperties.init(); - defaultActiveMQConnectionFactory = createActiveMQConnectionFactory(raProperties); + defaultActiveMQConnectionFactory = newConnectionFactory(raProperties); recoveryActiveMQConnectionFactory = createRecoveryActiveMQConnectionFactory(raProperties); Map<String, String> recoveryConfProps = new HashMap<>(); @@ -1623,126 +1624,133 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { raProperties.setJgroupsChannelRefName(jgroupsChannelRefName); } - public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { + public synchronized ActiveMQConnectionFactory getConnectionFactory(final ConnectionFactoryProperties overrideProperties) { ActiveMQConnectionFactory cf; boolean known = false; if (!knownConnectionFactories.keySet().contains(overrideProperties)) { - List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); + cf = newConnectionFactory(overrideProperties); + knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + } + else { + Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties); + cf = pair.getA(); + pair.getB().incrementAndGet(); + known = true; + } - String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); + if (known && cf.getServerLocator().isClosed()) { + knownConnectionFactories.remove(overrideProperties); + cf = newConnectionFactory(overrideProperties); + knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + } - Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); + return cf; + } - String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); + public ActiveMQConnectionFactory newConnectionFactory(ConnectionFactoryProperties overrideProperties) { + ActiveMQConnectionFactory cf; + List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); - String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); + String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); - String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); - if (ha == null) { - ha = ActiveMQClient.DEFAULT_IS_HA; - } + String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); - if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { - BroadcastEndpointFactory endpointFactory = null; + String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); - if (jgroupsLocatorClassName != null) { - String jchannelRefName = raProperties.getJgroupsChannelRefName(); - JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); - endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); - } - else if (discoveryAddress != null) { - Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); - if (discoveryPort == null) { - discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; - } - - String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); - } - else if (jgroupsFileName != null) { - endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); - } - Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); - if (refreshTimeout == null) { - refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; - } + String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + + if (ha == null) { + ha = ActiveMQClient.DEFAULT_IS_HA; + } - Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); + if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { + BroadcastEndpointFactory endpointFactory = null; - if (initialTimeout == null) { - initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; + if (jgroupsLocatorClassName != null) { + String jchannelRefName = raProperties.getJgroupsChannelRefName(); + JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); + endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); + } + else if (discoveryAddress != null) { + Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); + if (discoveryPort == null) { + discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; } - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); + String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); + endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); + } + else if (jgroupsFileName != null) { + endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); + } + Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); + if (refreshTimeout == null) { + refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; + } - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); - } + Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); - } + if (initialTimeout == null) { + initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; } - else if (connectorClassName != null) { - TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; - List<Map<String, Object>> connectionParams; - if (overrideProperties.getParsedConnectorClassNames() != null) { - connectionParams = overrideProperties.getParsedConnectionParameters(); - } - else { - connectionParams = raProperties.getParsedConnectionParameters(); - } + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); - for (int i = 0; i < connectorClassName.size(); i++) { - TransportConfiguration tc; - if (connectionParams == null || i >= connectionParams.size()) { - tc = new TransportConfiguration(connectorClassName.get(i)); - ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); - } - else { - tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); - } - - transportConfigurations[i] = tc; - } + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); + } - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + - Arrays.toString(transportConfigurations) + " with ha=" + ha); - } + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); + } + } + else if (connectorClassName != null) { + TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; + + List<Map<String, Object>> connectionParams; + if (overrideProperties.getParsedConnectorClassNames() != null) { + connectionParams = overrideProperties.getParsedConnectionParameters(); + } + else { + connectionParams = raProperties.getParsedConnectionParameters(); + } - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + for (int i = 0; i < connectorClassName.size(); i++) { + TransportConfiguration tc; + if (connectionParams == null || i >= connectionParams.size()) { + tc = new TransportConfiguration(connectorClassName.get(i)); + ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); } else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); } + + transportConfigurations[i] = tc; } - else { - throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); + + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + + Arrays.toString(transportConfigurations) + " with ha=" + ha); } - setParams(cf, overrideProperties); - knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + } } else { - Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties); - cf = pair.getA(); - pair.getB().incrementAndGet(); - known = true; - } - - if (known && cf.getServerLocator().isClosed()) { - knownConnectionFactories.remove(overrideProperties); - cf = createActiveMQConnectionFactory(overrideProperties); + throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); } + setParams(cf, overrideProperties); return cf; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 79b9cb9..d7a242d 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -48,7 +48,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.ra.ActiveMQRABundle; @@ -420,12 +419,19 @@ public class ActiveMQActivation { // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up } - if (threadTearDown.isAlive()) { - if (factory != null) { - // This will interrupt any threads waiting on reconnect + if (factory != null) { + try { + // closing the factory will help making sure pending threads are closed factory.close(); - factory = null; } + catch (Throwable e) { + ActiveMQRALogger.LOGGER.warn(e); + } + + factory = null; + } + + if (threadTearDown.isAlive()) { threadTearDown.interrupt(); try { @@ -440,11 +446,6 @@ public class ActiveMQActivation { } } - if (spec.isHasBeenUpdated() && factory != null) { - ra.closeConnectionFactory(spec); - factory = null; - } - nodes.clear(); lastReceived = false; @@ -465,23 +466,11 @@ public class ActiveMQActivation { factory = (ActiveMQConnectionFactory) fac; } else { - ActiveMQRAConnectionFactory raFact = (ActiveMQRAConnectionFactory) fac; - if (spec.isHasBeenUpdated()) { - factory = raFact.getResourceAdapter().createActiveMQConnectionFactory(spec); - } - else { - factory = raFact.getDefaultFactory(); - if (factory != ra.getDefaultActiveMQConnectionFactory()) { - ActiveMQRALogger.LOGGER.warnDifferentConnectionfactory(); - } - } + factory = ra.newConnectionFactory(spec); } } - else if (spec.isHasBeenUpdated()) { - factory = ra.createActiveMQConnectionFactory(spec); - } else { - factory = ra.getDefaultActiveMQConnectionFactory(); + factory = ra.newConnectionFactory(spec); } } @@ -627,9 +616,18 @@ public class ActiveMQActivation { return buffer.toString(); } - public void rebalance() { - ActiveMQRALogger.LOGGER.rebalancingConnections(); - reconnect(null); + public void startReconnectThread(final String threadName) { + if (trace) { + ActiveMQRALogger.LOGGER.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this); + } + Runnable runnable = new Runnable() { + @Override + public void run() { + reconnect(null); + } + }; + Thread t = new Thread(runnable, threadName); + t.start(); } /** @@ -638,6 +636,9 @@ public class ActiveMQActivation { * @param failure if reconnecting in the event of a failure */ public void reconnect(Throwable failure) { + if (trace) { + ActiveMQRALogger.LOGGER.trace("reconnecting activation " + this); + } if (failure != null) { if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) { ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); @@ -728,6 +729,7 @@ public class ActiveMQActivation { } private class RebalancingListener implements ClusterTopologyListener { + @Override public void nodeUP(TopologyMember member, boolean last) { boolean newNode = false; @@ -741,14 +743,8 @@ public class ActiveMQActivation { } if (lastReceived && newNode) { - Runnable runnable = new Runnable() { - @Override - public void run() { - rebalance(); - } - }; - Thread t = new Thread(runnable, "NodeUP Connection Rebalancer"); - t.start(); + ActiveMQRALogger.LOGGER.rebalancingConnections("nodeUp " + member.toString()); + startReconnectThread("NodeUP Connection Rebalancer"); } else if (last) { lastReceived = true; @@ -759,14 +755,8 @@ public class ActiveMQActivation { public void nodeDown(long eventUID, String nodeID) { if (nodes.remove(nodeID)) { removedNodes.put(nodeID, eventUID); - Runnable runnable = new Runnable() { - @Override - public void run() { - rebalance(); - } - }; - Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer"); - t.start(); + ActiveMQRALogger.LOGGER.rebalancingConnections("nodeDown " + nodeID); + startReconnectThread("NodeDOWN Connection Rebalancer"); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index a180fc7..b0d64cc 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -386,7 +386,8 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList session.resetIfNeeded(); } catch (ActiveMQException e) { - ActiveMQRALogger.LOGGER.unableToResetSession(); + ActiveMQRALogger.LOGGER.unableToResetSession(activation.toString(), e); + activation.startReconnectThread("Reset MessageHandler after Failure Thread"); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index fbf9655..77cfd0d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -27,6 +27,7 @@ import java.security.MessageDigest; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -2924,7 +2925,7 @@ public class JournalStorageManager implements StorageManager { @Override public String toString() { - return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]"; + return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "(" + new Date(scheduledDeliveryTime) + ")]"; } private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index b8b30bc..f638fbc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -28,11 +28,13 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; @@ -218,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return bytes; } + // for tests + public ClientSessionFactory getSessionFactory() { + return csf; + } + /* (non-Javadoc) * @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages() */ @@ -905,8 +912,24 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled scheduleRetryConnect(); } } + catch (ActiveMQInterruptedException e) { + ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this); + } + catch (InterruptedException e) { + ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this); + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this); + if (csf != null) { + try { + csf.close(); + csf = null; + } + catch (Throwable ignored) { + } + } + fail(false); + scheduleRetryConnect(); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index e04c35c..4a8b16a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -285,7 +285,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // If the consumer is stopped then we don't accept the message, it // should go back into the // queue for delivery later. - if (!started || transferring || !callback.isWritable(this)) { + // TCP-flow control has to be done first than everything else otherwise we may lose notifications + if (!callback.isWritable(this) || !started || transferring ) { return HandleStatus.BUSY; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d036fdf..e21102c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1047,7 +1047,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString()); try { - if (!tx.isEffective()) { + if (tx.getState() != Transaction.State.PREPARED) { // we don't want to rollback anything prepared here if (tx.getXid() != null) { resourceManager.removeTransaction(tx.getXid()); @@ -1085,7 +1085,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } if (theTX.isEffective()) { - ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared"); + ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it " + theTX.getState()); tx = null; } else { @@ -1568,9 +1568,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (theTx.getState() == State.ROLLEDBACK) { Transaction newTX = newTransaction(); cancelAndRollback(clientFailed, newTX, wasStarted, toCancel); - throw new IllegalStateException("Transaction has already been rolled back"); } - cancelAndRollback(clientFailed, theTx, wasStarted, toCancel); + else { + cancelAndRollback(clientFailed, theTx, wasStarted, toCancel); + } } private void cancelAndRollback(boolean clientFailed, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 3490fee..ee90c4a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -32,6 +33,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation; public class TransactionImpl implements Transaction { + private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); + private List<TransactionOperation> operations; private static final int INITIAL_NUM_PROPERTIES = 10; @@ -105,8 +108,7 @@ public class TransactionImpl implements Transaction { @Override public boolean isEffective() { - return state == State.PREPARED || state == State.COMMITTED; - + return state == State.PREPARED || state == State.COMMITTED || state == State.ROLLEDBACK; } @Override @@ -141,32 +143,43 @@ public class TransactionImpl implements Transaction { @Override public boolean hasTimedOut(final long currentTime, final int defaultTimeout) { - if (timeoutSeconds == -1) { - return getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000; - } - else { - return getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000; + synchronized (timeoutLock) { + boolean timedout; + if (timeoutSeconds == -1) { + timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000; + } + else { + timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000; + } + + if (timedout) { + markAsRollbackOnly(new ActiveMQException("TX Timeout")); + } + + return timedout; } } @Override public void prepare() throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::" + this); + } storageManager.readLock(); try { synchronized (timeoutLock) { if (isEffective()) { - ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call"); + ActiveMQServerLogger.LOGGER.debug("TransactionImpl::prepare::" + this + " is being ignored"); return; } if (state == State.ROLLBACK_ONLY) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this); + } + + internalRollback(); + if (exception != null) { - // this TX will never be rolled back, - // so we reset it now - beforeRollback(); - afterRollback(); - if (operations != null) { - operations.clear(); - } throw exception; } else { @@ -216,14 +229,17 @@ public class TransactionImpl implements Transaction { @Override public void commit(final boolean onePhase) throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::commit::" + this); + } synchronized (timeoutLock) { if (state == State.COMMITTED) { // I don't think this could happen, but just in case - ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call"); + ActiveMQServerLogger.LOGGER.debug("TransactionImpl::commit::" + this + " is being ignored"); return; } if (state == State.ROLLBACK_ONLY) { - rollback(); + internalRollback(); if (exception != null) { throw exception; @@ -236,12 +252,12 @@ public class TransactionImpl implements Transaction { if (xid != null) { if (onePhase && state != State.ACTIVE || !onePhase && state != State.PREPARED) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } else { if (state != State.ACTIVE) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } @@ -249,6 +265,11 @@ public class TransactionImpl implements Transaction { doCommit(); + // We want to make sure that nothing else gets done after the commit is issued + // this will eliminate any possibility or races + final List<TransactionOperation> operationsToComplete = this.operations; + this.operations = null; + // We use the Callback even for non persistence // If we are using non-persistence with replication, the replication manager will have // to execute this runnable in the correct order @@ -263,7 +284,7 @@ public class TransactionImpl implements Transaction { @Override public void done() { - afterCommit(); + afterCommit(operationsToComplete); } }); @@ -285,44 +306,65 @@ public class TransactionImpl implements Transaction { @Override public void rollback() throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::rollback::" + this); + } + synchronized (timeoutLock) { if (state == State.ROLLEDBACK) { // I don't think this could happen, but just in case - ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace")); + ActiveMQServerLogger.LOGGER.debug("TransactionImpl::rollback::" + this + " is being ignored"); return; } if (xid != null) { if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } else { if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } - beforeRollback(); - - doRollback(); - state = State.ROLLEDBACK; + internalRollback(); + } + } - // We use the Callback even for non persistence - // If we are using non-persistence with replication, the replication manager will have - // to execute this runnable in the correct order - storageManager.afterCompleteOperations(new IOCallback() { + private void internalRollback() throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::internalRollback " + this); + } - @Override - public void onError(final int errorCode, final String errorMessage) { - ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); - } + beforeRollback(); - @Override - public void done() { - afterRollback(); - } - }); + try { + doRollback(); + state = State.ROLLEDBACK; + } + catch (IllegalStateException e) { + // Something happened before and the TX didn't make to the Journal / Storage + // We will like to execute afterRollback and clear anything pending + ActiveMQServerLogger.LOGGER.warn(e); } + // We want to make sure that nothing else gets done after the commit is issued + // this will eliminate any possibility or races + final List<TransactionOperation> operationsToComplete = this.operations; + this.operations = null; + + // We use the Callback even for non persistence + // If we are using non-persistence with replication, the replication manager will have + // to execute this runnable in the correct order + storageManager.afterCompleteOperations(new IOCallback() { + + public void onError(final int errorCode, final String errorMessage) { + ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); + } + + public void done() { + afterRollback(operationsToComplete); + } + }); } @Override @@ -361,10 +403,14 @@ public class TransactionImpl implements Transaction { } @Override - public void markAsRollbackOnly(final ActiveMQException exception1) { + public void markAsRollbackOnly(final ActiveMQException exception) { synchronized (timeoutLock) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::" + this + " marking rollbackOnly for " + exception.toString() + ", msg=" + exception.getMessage()); + } + if (isEffective()) { - ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)"); + ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared, committed or rolledback!)"); return; } @@ -373,7 +419,7 @@ public class TransactionImpl implements Transaction { } state = State.ROLLBACK_ONLY; - this.exception = exception1; + this.exception = exception; } } @@ -434,19 +480,23 @@ public class TransactionImpl implements Transaction { } } - private synchronized void afterCommit() { - if (operations != null) { - for (TransactionOperation operation : operations) { + private synchronized void afterCommit(List<TransactionOperation> oeprationsToComplete) { + if (oeprationsToComplete != null) { + for (TransactionOperation operation : oeprationsToComplete) { operation.afterCommit(this); } + // Help out GC here + oeprationsToComplete.clear(); } } - private synchronized void afterRollback() { - if (operations != null) { - for (TransactionOperation operation : operations) { + private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete) { + if (oeprationsToComplete != null) { + for (TransactionOperation operation : oeprationsToComplete) { operation.afterRollback(this); } + // Help out GC here + oeprationsToComplete.clear(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java new file mode 100644 index 0000000..3909c3c --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -0,0 +1,673 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.transaction.impl; + +import javax.transaction.xa.Xid; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.paging.PageTransactionInfo; +import org.apache.activemq.artemis.core.paging.PagedMessage; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.persistence.GroupingInfo; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; +import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; +import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.replication.ReplicationManager; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.RouteContextList; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.JournalLoader; +import org.apache.activemq.artemis.core.transaction.ResourceManager; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperation; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class TransactionImplTest extends ActiveMQTestBase { + + @Test + public void testTimeoutAndThenCommitWithARollback() throws Exception { + TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10); + Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10)); + + final AtomicInteger commit = new AtomicInteger(0); + final AtomicInteger rollback = new AtomicInteger(0); + + tx.addOperation(new TransactionOperation() { + @Override + public void beforePrepare(Transaction tx) throws Exception { + + } + + @Override + public void afterPrepare(Transaction tx) { + + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + + } + + @Override + public void afterCommit(Transaction tx) { + System.out.println("commit..."); + commit.incrementAndGet(); + } + + @Override + public void beforeRollback(Transaction tx) throws Exception { + + } + + @Override + public void afterRollback(Transaction tx) { + System.out.println("rollback..."); + rollback.incrementAndGet(); + } + + @Override + public List<MessageReference> getRelatedMessageReferences() { + return null; + } + + @Override + public List<MessageReference> getListOnConsumer(long consumerID) { + return null; + } + }); + + for (int i = 0; i < 2; i++) { + try { + tx.commit(); + Assert.fail("Exception expected!"); + } + catch (ActiveMQException expected) { + } + } + + // it should just be ignored! + tx.rollback(); + + Assert.assertEquals(0, commit.get()); + Assert.assertEquals(1, rollback.get()); + + } + + @Test + public void testTimeoutThenRollbackWithRollback() throws Exception { + TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10); + Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10)); + + final AtomicInteger commit = new AtomicInteger(0); + final AtomicInteger rollback = new AtomicInteger(0); + + tx.addOperation(new TransactionOperation() { + @Override + public void beforePrepare(Transaction tx) throws Exception { + + } + + @Override + public void afterPrepare(Transaction tx) { + + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + + } + + @Override + public void afterCommit(Transaction tx) { + System.out.println("commit..."); + commit.incrementAndGet(); + } + + @Override + public void beforeRollback(Transaction tx) throws Exception { + + } + + @Override + public void afterRollback(Transaction tx) { + System.out.println("rollback..."); + rollback.incrementAndGet(); + } + + @Override + public List<MessageReference> getRelatedMessageReferences() { + return null; + } + + @Override + public List<MessageReference> getListOnConsumer(long consumerID) { + return null; + } + }); + + tx.rollback(); + + // This is a case where another failure was detected (In parallel with the TX timeout for instance) + tx.markAsRollbackOnly(new ActiveMQException("rollback only again")); + tx.rollback(); + + Assert.assertEquals(0, commit.get()); + Assert.assertEquals(1, rollback.get()); + + } + + class FakeSM implements StorageManager { + + @Override + public OperationContext getContext() { + return null; + } + + @Override + public void lineUpContext() { + + } + + @Override + public OperationContext newContext(Executor executor) { + return null; + } + + @Override + public OperationContext newSingleThreadContext() { + return null; + } + + @Override + public void setContext(OperationContext context) { + + } + + @Override + public void stop(boolean ioCriticalError) throws Exception { + + } + + @Override + public void pageClosed(SimpleString storeName, int pageNumber) { + + } + + @Override + public void pageDeleted(SimpleString storeName, int pageNumber) { + + } + + @Override + public void pageWrite(PagedMessage message, int pageNumber) { + + } + + @Override + public void afterCompleteOperations(IOCallback run) { + run.done(); + } + + @Override + public boolean waitOnOperations(long timeout) throws Exception { + return false; + } + + @Override + public void waitOnOperations() throws Exception { + + } + + @Override + public void beforePageRead() throws Exception { + + } + + @Override + public void afterPageRead() throws Exception { + + } + + @Override + public ByteBuffer allocateDirectBuffer(int size) { + return null; + } + + @Override + public void freeDirectBuffer(ByteBuffer buffer) { + + } + + @Override + public void clearContext() { + + } + + @Override + public void confirmPendingLargeMessageTX(Transaction transaction, + long messageID, + long recordID) throws Exception { + + } + + @Override + public void confirmPendingLargeMessage(long recordID) throws Exception { + + } + + @Override + public void storeMessage(ServerMessage message) throws Exception { + + } + + @Override + public void storeReference(long queueID, long messageID, boolean last) throws Exception { + + } + + @Override + public void deleteMessage(long messageID) throws Exception { + + } + + @Override + public void storeAcknowledge(long queueID, long messageID) throws Exception { + + } + + @Override + public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception { + + } + + @Override + public void updateDeliveryCount(MessageReference ref) throws Exception { + + } + + @Override + public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { + + } + + @Override + public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception { + + } + + @Override + public void deleteDuplicateID(long recordID) throws Exception { + + } + + @Override + public void storeMessageTransactional(long txID, ServerMessage message) throws Exception { + + } + + @Override + public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception { + + } + + @Override + public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception { + + } + + @Override + public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception { + + } + + @Override + public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception { + + } + + @Override + public void deleteCursorAcknowledge(long ackID) throws Exception { + + } + + @Override + public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception { + + } + + @Override + public void deletePageComplete(long ackID) throws Exception { + + } + + @Override + public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception { + + } + + @Override + public void storeDuplicateIDTransactional(long txID, + SimpleString address, + byte[] duplID, + long recordID) throws Exception { + + } + + @Override + public void updateDuplicateIDTransactional(long txID, + SimpleString address, + byte[] duplID, + long recordID) throws Exception { + + } + + @Override + public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception { + + } + + @Override + public LargeServerMessage createLargeMessage() { + return null; + } + + @Override + public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception { + return null; + } + + @Override + public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) { + return null; + } + + @Override + public void prepare(long txID, Xid xid) throws Exception { + + } + + @Override + public void commit(long txID) throws Exception { + + } + + @Override + public void commit(long txID, boolean lineUpContext) throws Exception { + + } + + @Override + public void rollback(long txID) throws Exception { + + } + + @Override + public void rollbackBindings(long txID) throws Exception { + + } + + @Override + public void commitBindings(long txID) throws Exception { + + } + + @Override + public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception { + + } + + @Override + public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception { + + } + + @Override + public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception { + + } + + @Override + public void deletePageTransactional(long recordID) throws Exception { + + } + + @Override + public JournalLoadInformation loadMessageJournal(PostOffice postOffice, + PagingManager pagingManager, + ResourceManager resourceManager, + Map<Long, QueueBindingInfo> queueInfos, + Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap, + Set<Pair<Long, Long>> pendingLargeMessages, + List<PageCountPending> pendingNonTXPageCounter, + JournalLoader journalLoader) throws Exception { + return null; + } + + @Override + public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception { + return 0; + } + + @Override + public void deleteHeuristicCompletion(long id) throws Exception { + + } + + @Override + public void addQueueBinding(long tx, Binding binding) throws Exception { + + } + + @Override + public void deleteQueueBinding(long tx, long queueBindingID) throws Exception { + + } + + @Override + public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, + List<GroupingInfo> groupingInfos) throws Exception { + return null; + } + + @Override + public void addGrouping(GroupBinding groupBinding) throws Exception { + + } + + @Override + public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception { + + } + + @Override + public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception { + + } + + @Override + public void deleteAddressSetting(SimpleString addressMatch) throws Exception { + + } + + @Override + public List<PersistedAddressSetting> recoverAddressSettings() throws Exception { + return null; + } + + @Override + public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception { + + } + + @Override + public void deleteSecurityRoles(SimpleString addressMatch) throws Exception { + + } + + @Override + public List<PersistedRoles> recoverPersistedRoles() throws Exception { + return null; + } + + @Override + public long storePageCounter(long txID, long queueID, long value) throws Exception { + return 0; + } + + @Override + public long storePendingCounter(long queueID, long pageID, int inc) throws Exception { + return 0; + } + + @Override + public void deleteIncrementRecord(long txID, long recordID) throws Exception { + + } + + @Override + public void deletePageCounter(long txID, long recordID) throws Exception { + + } + + @Override + public void deletePendingPageCounter(long txID, long recordID) throws Exception { + + } + + @Override + public long storePageCounterInc(long txID, long queueID, int add) throws Exception { + return 0; + } + + @Override + public long storePageCounterInc(long queueID, int add) throws Exception { + return 0; + } + + @Override + public Journal getBindingsJournal() { + return null; + } + + @Override + public Journal getMessageJournal() { + return null; + } + + @Override + public void startReplication(ReplicationManager replicationManager, + PagingManager pagingManager, + String nodeID, + boolean autoFailBack, + long initialReplicationSyncTimeout) throws Exception { + + } + + @Override + public boolean addToPage(PagingStore store, + ServerMessage msg, + Transaction tx, + RouteContextList listCtx) throws Exception { + return false; + } + + @Override + public void stopReplication() { + + } + + @Override + public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception { + + } + + @Override + public void storeID(long journalID, long id) throws Exception { + + } + + @Override + public void deleteID(long journalD) throws Exception { + + } + + @Override + public void readLock() { + + } + + @Override + public void readUnLock() { + + } + + @Override + public void persistIdGenerator() { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public boolean isStarted() { + return false; + } + + @Override + public long generateID() { + return 0; + } + + @Override + public long getCurrentID() { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java new file mode 100644 index 0000000..3874937 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java @@ -0,0 +1,228 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.management.Notification; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * This will simulate a failure of a failure. + * The bridge could eventually during a race or multiple failures not be able to reconnect because it failed again. + * this should make the bridge to always reconnect itself. + */ +@RunWith(BMUnitRunner.class) +public class ClusteredBridgeReconnectTest extends ClusterTestBase { + + static ThreadLocal<Boolean> inConnect = new ThreadLocal<Boolean>(); + + public static void enterConnect() { + inConnect.set(Boolean.TRUE); + } + + public static void exitConnect() { + inConnect.set(null); + } + + public static volatile boolean shouldFail = false; + + public static void send() { + if (inConnect.get() != null) { + if (shouldFail) { + shouldFail = false; + throw new NullPointerException("just because it's a test..."); + } + } + } + + @Test + @BMRules( + rules = {@BMRule( + name = "enter", + targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl", + targetMethod = "connect", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.enterConnect();"), @BMRule( + name = "exit", + targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl", + targetMethod = "connect", + targetLocation = "EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.exitConnect();"), @BMRule( + name = "send", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl", + targetMethod = "send(org.apache.activemq.artemis.core.protocol.core.Packet)", + targetLocation = "EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.send();") + + }) + public void testReconnectBridge() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, true); + createQueue(1, "queues.testaddress", "queue0", null, true); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 1, 1, false); + + ClientSession session0 = sfs[0].createSession(); + ClientSession session1 = sfs[0].createSession(); + + session0.start(); + session1.start(); + + ClientProducer producer = session0.createProducer("queues.testaddress"); + + int NUMBER_OF_MESSAGES = 100; + + Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size()); + + ClusterConnectionImpl connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0]; + Assert.assertEquals(1, connection.getRecords().size()); + + MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0]; + ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge(); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + ClientMessage msg = session0.createMessage(true); + producer.send(msg); + session0.commit(); + + if (i == 17) { + shouldFail = true; + bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!")); + } + } + + int cons0Count = 0, cons1Count = 0; + + while (true) { + ClientMessage msg = consumers[0].getConsumer().receive(1000); + if (msg == null) { + break; + } + cons0Count++; + msg.acknowledge(); + session0.commit(); + } + + while (true) { + ClientMessage msg = consumers[1].getConsumer().receive(1000); + if (msg == null) { + break; + } + cons1Count++; + msg.acknowledge(); + session1.commit(); + } + + Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count); + + session0.commit(); + session1.commit(); + + stopServers(0, 1); + + } + + static CountDownLatch latch; + static CountDownLatch latch2; + static Thread main; + + public static void pause(SimpleString clusterName) { + if (clusterName.toString().startsWith("queue0")) { + try { + latch2.countDown(); + latch.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void pause2(Notification notification) { + if (notification.getType() == CoreNotificationType.BINDING_REMOVED) { + SimpleString clusterName = notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); + boolean inMain = main == Thread.currentThread(); + if (clusterName.toString().startsWith("queue0") && !inMain) { + try { + latch2.countDown(); + latch.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + public static void restart2() { + latch.countDown(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + shouldFail = false; + } + + @Override + @After + public void tearDown() throws Exception { + closeAllConsumers(); + closeAllSessionFactories(); + closeAllServerLocatorsFactories(); + super.tearDown(); + } + + public boolean isNetty() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java index 5b2dd0d..52cc9d8 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java @@ -74,6 +74,11 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase { latchFlag.setCount(1); } + @Override + protected boolean usePersistence() { + return true; + } + @Test @BMRules( rules = {@BMRule( @@ -84,6 +89,7 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase { action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")}) public void testConcurrentCancels() throws Exception { + System.out.println(server.getConfiguration().getJournalLocation().toString()); server.getAddressSettingsRepository().clear(); AddressSettings settings = new AddressSettings(); settings.setMaxDeliveryAttempts(-1); @@ -184,18 +190,6 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase { } } }); - // - // consumer.close(); - // - // threads.add(new Thread("ClientFailing") - // { - // public void run() - // { - // ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession(); - // impl.getConnection().fail(new HornetQException("failure")); - // } - // }); - // for (Thread t : threads) { t.start(); @@ -213,47 +207,55 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase { } Connection connection = cf.createConnection(); - connection.setClientID("myID"); - - Thread.sleep(2000); // I am too lazy to call end on all the transactions - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - HashMap<Integer, AtomicInteger> mapCount = new HashMap<>(); - - while (true) { - TextMessage message = (TextMessage) consumer.receiveNoWait(); - if (message == null) { - break; - } + try { + connection.setClientID("myID"); - Integer value = message.getIntProperty("i"); + Thread.sleep(5000); // I am too lazy to call end on all the transactions + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + HashMap<Integer, AtomicInteger> mapCount = new HashMap<>(); + + while (true) { + TextMessage message = (TextMessage) consumer.receiveNoWait(); + if (message == null) { + break; + } - AtomicInteger count = mapCount.get(value); - if (count == null) { - count = new AtomicInteger(0); - mapCount.put(message.getIntProperty("i"), count); - } + Integer value = message.getIntProperty("i"); - count.incrementAndGet(); - } + AtomicInteger count = mapCount.get(value); + if (count == null) { + count = new AtomicInteger(0); + mapCount.put(message.getIntProperty("i"), count); + } - boolean failed = false; - for (int i = 0; i < numberOfMessages; i++) { - AtomicInteger count = mapCount.get(i); - if (count == null) { - System.out.println("Message " + i + " not received"); - failed = true; + count.incrementAndGet(); } - else if (count.get() > 1) { - System.out.println("Message " + i + " received " + count.get() + " times"); - failed = true; + + boolean failed = false; + for (int i = 0; i < numberOfMessages; i++) { + AtomicInteger count = mapCount.get(i); + if (count == null) { + System.out.println("Message " + i + " not received"); + failed = true; + } + else if (count.get() > 1) { + System.out.println("Message " + i + " received " + count.get() + " times"); + failed = true; + } } - } - Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed); + if (failed) { + System.err.println("Failed"); + System.exit(-1); + } - connection.close(); + Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed); + } + finally { + connection.close(); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java index 22611bc..76679ef 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java @@ -27,6 +27,7 @@ import javax.jms.XASession; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -91,7 +92,7 @@ public class TimeoutXATest extends ActiveMQTestBase { @BMRule( name = "afterRollback TX", targetClass = "org.apache.activemq.artemis.core.transaction.impl.TransactionImpl", - targetMethod = "afterRollback()", + targetMethod = "afterRollback", targetLocation = "ENTRY", helper = "org.apache.activemq.artemis.tests.extras.byteman.TimeoutXATest", action = "afterRollback()")}) @@ -166,23 +167,20 @@ public class TimeoutXATest extends ActiveMQTestBase { Thread.sleep(1000); removingTXAwait0.countDown(); - enteredRollbackLatch.await(); + Assert.assertTrue(enteredRollbackLatch.await(10, TimeUnit.SECONDS)); waitingRollbackLatch.countDown(); t.join(); consumer.close(); -// -// connction2.start(); -// + consumer = session.createConsumer(queue); for (int i = 0; i < 10; i++) { Assert.assertNotNull(consumer.receive(5000)); } Assert.assertNull(consumer.receiveNoWait()); -// session.commit(); -// session.close(); + connection.close(); connction2.close();
