http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java index 79c81d3..3f14f55 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java @@ -14,9 +14,9 @@ package org.apache.activemq.jms.server.management; import org.apache.activemq.api.jms.management.JMSServerControl; import org.apache.activemq.core.server.Queue; -import org.apache.activemq.jms.client.HornetQConnectionFactory; -import org.apache.activemq.jms.client.HornetQQueue; -import org.apache.activemq.jms.client.HornetQTopic; +import org.apache.activemq.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.jms.client.ActiveMQQueue; +import org.apache.activemq.jms.client.ActiveMQTopic; import org.apache.activemq.jms.server.JMSServerManager; import org.apache.activemq.jms.server.config.ConnectionFactoryConfiguration; @@ -31,15 +31,15 @@ public interface JMSManagementService void unregisterJMSServer() throws Exception; - void registerQueue(HornetQQueue queue, Queue serverQueue) throws Exception; + void registerQueue(ActiveMQQueue queue, Queue serverQueue) throws Exception; void unregisterQueue(String name) throws Exception; - void registerTopic(HornetQTopic topic) throws Exception; + void registerTopic(ActiveMQTopic topic) throws Exception; void unregisterTopic(String name) throws Exception; - void registerConnectionFactory(String name, ConnectionFactoryConfiguration config, HornetQConnectionFactory connectionFactory) throws Exception; + void registerConnectionFactory(String name, ConnectionFactoryConfiguration config, ActiveMQConnectionFactory connectionFactory) throws Exception; void unregisterConnectionFactory(String name) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java index 151a20d..650f97e 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java @@ -23,12 +23,12 @@ import org.apache.activemq.api.jms.management.JMSServerControl; import org.apache.activemq.api.jms.management.TopicControl; import org.apache.activemq.core.messagecounter.MessageCounter; import org.apache.activemq.core.messagecounter.MessageCounterManager; -import org.apache.activemq.core.server.HornetQServer; +import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.management.ManagementService; -import org.apache.activemq.jms.client.HornetQConnectionFactory; -import org.apache.activemq.jms.client.HornetQQueue; -import org.apache.activemq.jms.client.HornetQTopic; +import org.apache.activemq.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.jms.client.ActiveMQQueue; +import org.apache.activemq.jms.client.ActiveMQTopic; import org.apache.activemq.jms.management.impl.JMSConnectionFactoryControlImpl; import org.apache.activemq.jms.management.impl.JMSQueueControlImpl; import org.apache.activemq.jms.management.impl.JMSServerControlImpl; @@ -54,7 +54,7 @@ public class JMSManagementServiceImpl implements JMSManagementService // Static -------------------------------------------------------- - public JMSManagementServiceImpl(final ManagementService managementService, final HornetQServer server, final JMSServerManager jmsServerManager) + public JMSManagementServiceImpl(final ManagementService managementService, final ActiveMQServer server, final JMSServerManager jmsServerManager) { this.managementService = managementService; this.jmsServerManager = jmsServerManager; @@ -80,7 +80,7 @@ public class JMSManagementServiceImpl implements JMSManagementService managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER); } - public synchronized void registerQueue(final HornetQQueue queue, final Queue serverQueue) throws Exception + public synchronized void registerQueue(final ActiveMQQueue queue, final Queue serverQueue) throws Exception { QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress()); MessageCounterManager messageCounterManager = managementService.getMessageCounterManager(); @@ -104,7 +104,7 @@ public class JMSManagementServiceImpl implements JMSManagementService managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name); } - public synchronized void registerTopic(final HornetQTopic topic) throws Exception + public synchronized void registerTopic(final ActiveMQTopic topic) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName()); AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress()); @@ -122,7 +122,7 @@ public class JMSManagementServiceImpl implements JMSManagementService public synchronized void registerConnectionFactory(final String name, final ConnectionFactoryConfiguration cfConfig, - final HornetQConnectionFactory connectionFactory) throws Exception + final ActiveMQConnectionFactory connectionFactory) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name); JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(cfConfig, connectionFactory, jmsServerManager, name); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java new file mode 100644 index 0000000..797fe83 --- /dev/null +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java @@ -0,0 +1,250 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.server.recovery; + +import javax.transaction.xa.XAResource; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.api.core.Pair; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.jms.server.ActiveMQJMSServerLogger; +import org.jboss.tm.XAResourceRecovery; + +/** + * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p> + * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link ActiveMQRecoveryRegistry#register(XARecoveryConfig)}</p> + * <p>Later the {@link RecoveryDiscovery} will call {@link ActiveMQRecoveryRegistry#nodeUp(String, Pair, String, String)} + * so we will keep a track of nodes on the cluster + * or nodes where this server is connected to. </p> + * + * @author clebertsuconic + */ +public class ActiveMQRecoveryRegistry implements XAResourceRecovery +{ + + private static final ActiveMQRecoveryRegistry theInstance = new ActiveMQRecoveryRegistry(); + + private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>(); + + /** + * The list by server id and resource adapter wrapper, what will actually be calling recovery. + * This will be returned by getXAResources + */ + private final ConcurrentHashMap<String, ActiveMQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, ActiveMQXAResourceWrapper>(); + + /** + * In case of failures, we retry on the next getXAResources + */ + private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>(); + + private ActiveMQRecoveryRegistry() + { + } + + /** + * This will be called periodically by the Transaction Manager + */ + public XAResource[] getXAResources() + { + try + { + checkFailures(); + + ActiveMQXAResourceWrapper[] resourceArray = new ActiveMQXAResourceWrapper[recoveries.size()]; + resourceArray = recoveries.values().toArray(resourceArray); + + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("\n======================================================================================="); + ActiveMQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:"); + for (Map.Entry<String, ActiveMQXAResourceWrapper> entry : recoveries.entrySet()) + { + ActiveMQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue()); + } + ActiveMQJMSServerLogger.LOGGER.debug("=======================================================================================\n"); + } + + return resourceArray; + } + catch (Throwable e) + { + ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e); + return new XAResource[]{}; + } + } + + public static ActiveMQRecoveryRegistry getInstance() + { + return theInstance; + } + + /** + * This will be called by then resource adapters, to register a new discovery + * + * @param resourceConfig + */ + public void register(final XARecoveryConfig resourceConfig) + { + RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig); + RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance); + if (discoveryRecord == null) + { + discoveryRecord = newInstance; + discoveryRecord.start(false); + } + // you could have a configuration shared with multiple MDBs or RAs + discoveryRecord.incrementUsage(); + } + + /** + * Reference counts and deactivate a configuration + * Notice: this won't remove the servers since a server may have previous XIDs + * + * @param resourceConfig + */ + public void unRegister(final XARecoveryConfig resourceConfig) + { + RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig); + if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0) + { + discoveryRecord = configSet.remove(resourceConfig); + if (discoveryRecord != null) + { + discoveryRecord.stop(); + } + } + } + + /** + * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but + * maybe we should. + */ + public void stop() + { + for (RecoveryDiscovery recoveryDiscovery : configSet.values()) + { + recoveryDiscovery.stop(); + } + for (ActiveMQXAResourceWrapper activeMQXAResourceWrapper : recoveries.values()) + { + activeMQXAResourceWrapper.close(); + } + recoveries.clear(); + configSet.clear(); + } + + /** + * in case of a failure the Discovery will register itslef to retry + * + * @param failedDiscovery + */ + public void failedDiscovery(RecoveryDiscovery failedDiscovery) + { + ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery); + synchronized (failedDiscoverySet) + { + failedDiscoverySet.add(failedDiscovery); + } + } + + /** + * @param nodeID + * @param networkConfiguration + * @param username + * @param password + */ + public void nodeUp(String nodeID, + Pair<TransportConfiguration, TransportConfiguration> networkConfiguration, + String username, + String password) + { + + if (recoveries.get(nodeID) == null) + { + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration); + } + XARecoveryConfig config = new XARecoveryConfig(true, + extractTransportConfiguration(networkConfiguration), + username, + password); + + ActiveMQXAResourceWrapper wrapper = new ActiveMQXAResourceWrapper(config); + recoveries.putIfAbsent(nodeID, wrapper); + } + } + + public void nodeDown(String nodeID) + { + } + + /** + * this will go through the list of retries + */ + private void checkFailures() + { + final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>(); + + // it will transfer all the discoveries to a new collection + synchronized (failedDiscoverySet) + { + failures.addAll(failedDiscoverySet); + failedDiscoverySet.clear(); + } + + if (failures.size() > 0) + { + // This shouldn't happen on a regular scenario, however when this retry happens this needs + // to be done on a new thread + Thread t = new Thread("ActiveMQ Recovery Discovery Reinitialization") + { + @Override + public void run() + { + for (RecoveryDiscovery discovery : failures) + { + try + { + ActiveMQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery); + discovery.start(true); + } + catch (Throwable e) + { + ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + }; + + t.start(); + } + } + + /** + * @param networkConfiguration + * @return + */ + private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration) + { + if (networkConfiguration.getB() != null) + { + return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()}; + } + return new TransportConfiguration[]{networkConfiguration.getA()}; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java new file mode 100644 index 0000000..d232831 --- /dev/null +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java @@ -0,0 +1,71 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.server.recovery; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jboss.tm.XAResourceRecoveryRegistry; + +/** + * This class is a base class for the integration layer where + * This class is used on integration points and this is just a bridge to the real registry at + * {@link ActiveMQRecoveryRegistry} + * + * @author Clebert + * + * + */ +public abstract class ActiveMQRegistryBase +{ + + private final AtomicBoolean started = new AtomicBoolean(false); + + public ActiveMQRegistryBase() + { + } + + + public abstract XAResourceRecoveryRegistry getTMRegistry(); + + public void register(final XARecoveryConfig resourceConfig) + { + init(); + ActiveMQRecoveryRegistry.getInstance().register(resourceConfig); + } + + + + public void unRegister(final XARecoveryConfig resourceConfig) + { + init(); + ActiveMQRecoveryRegistry.getInstance().unRegister(resourceConfig); + } + + public void stop() + { + if (started.compareAndSet(true, false) && getTMRegistry() != null) + { + getTMRegistry().removeXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance()); + ActiveMQRecoveryRegistry.getInstance().stop(); + } + } + + private void init() + { + if (started.compareAndSet(false, true) && getTMRegistry() != null) + { + getTMRegistry().addXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance()); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java new file mode 100644 index 0000000..1cd59f4 --- /dev/null +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java @@ -0,0 +1,231 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.server.recovery; + +import javax.transaction.xa.XAResource; +import java.util.HashMap; +import java.util.Map; + +import com.arjuna.ats.jta.recovery.XAResourceRecovery; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.jms.server.ActiveMQJMSServerLogger; + +/** + * A XAResourceRecovery instance that can be used to recover any JMS provider. + * <p> + * In reality only recover, rollback and commit will be called but we still need to be implement all + * methods just in case. + * <p> + * To enable this add the following to the jbossts-properties file + * <pre> + * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ1" + * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/> + * </pre> + * <p> + * you'll need something like this if the ActiveMQ Server is remote + * <pre> + * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2" + * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/> + * </pre> + * <p> + * you'll need something like this if the ActiveMQ Server is remote and has failover configured + * <pre> + * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2" + * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/> + * </pre> + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version <tt>$Revision: 1.1 $</tt> + */ +public class ActiveMQXAResourceRecovery implements XAResourceRecovery +{ + private final boolean trace = ActiveMQJMSServerLogger.LOGGER.isTraceEnabled(); + + private boolean hasMore; + + private ActiveMQXAResourceWrapper res; + + public ActiveMQXAResourceRecovery() + { + if (trace) + { + ActiveMQJMSServerLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery"); + } + } + + public boolean initialise(final String config) + { + if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.trace(this + " intialise: " + config); + } + + String[] configs = config.split(";"); + XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length]; + for (int i = 0, configsLength = configs.length; i < configsLength; i++) + { + String s = configs[i]; + ConfigParser parser = new ConfigParser(s); + String connectorFactoryClassName = parser.getConnectorFactoryClassName(); + Map<String, Object> connectorParams = parser.getConnectorParameters(); + String username = parser.getUsername(); + String password = parser.getPassword(); + TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams); + xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password); + } + + + res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs); + + if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.trace(this + " initialised"); + } + + return true; + } + + public boolean hasMoreResources() + { + if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.trace(this + " hasMoreResources"); + } + + /* + * The way hasMoreResources is supposed to work is as follows: + * For each "sweep" the recovery manager will call hasMoreResources, then if it returns + * true it will call getXAResource. + * It will repeat that until hasMoreResources returns false. + * Then the sweep is over. + * For the next sweep hasMoreResources should return true, etc. + * + * In our case where we only need to return one XAResource per sweep, + * hasMoreResources should basically alternate between true and false. + * + * + */ + + hasMore = !hasMore; + + return hasMore; + } + + public XAResource getXAResource() + { + if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.trace(this + " getXAResource"); + } + + return res; + } + + public XAResource[] getXAResources() + { + return new XAResource[]{res}; + } + + @Override + protected void finalize() + { + res.close(); + } + + public static class ConfigParser + { + private final String connectorFactoryClassName; + + private final Map<String, Object> connectorParameters; + + private String username; + + private String password; + + public ConfigParser(final String config) + { + if (config == null || config.length() == 0) + { + throw new IllegalArgumentException("Must specify provider connector factory class name in config"); + } + + String[] strings = config.split(","); + + // First (mandatory) param is the connector factory class name + if (strings.length < 1) + { + throw new IllegalArgumentException("Must specify provider connector factory class name in config"); + } + + connectorFactoryClassName = strings[0].trim(); + + // Next two (optional) parameters are the username and password to use for creating the session for recovery + + if (strings.length >= 2) + { + + username = strings[1].trim(); + if (username.length() == 0) + { + username = null; + } + + if (strings.length == 2) + { + throw new IllegalArgumentException("If username is specified, password must be specified too"); + } + + password = strings[2].trim(); + if (password.length() == 0) + { + password = null; + } + } + + // other tokens are for connector configurations + connectorParameters = new HashMap<String, Object>(); + if (strings.length >= 3) + { + for (int i = 3; i < strings.length; i++) + { + String[] str = strings[i].split("="); + if (str.length == 2) + { + connectorParameters.put(str[0].trim(), str[1].trim()); + } + } + } + } + + public String getConnectorFactoryClassName() + { + return connectorFactoryClassName; + } + + public Map<String, Object> getConnectorParameters() + { + return connectorParameters; + } + + public String getUsername() + { + return username; + } + + public String getPassword() + { + return password; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java new file mode 100644 index 0000000..04a7457 --- /dev/null +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java @@ -0,0 +1,531 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.server.recovery; + +import java.util.Arrays; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.ActiveMQNotConnectedException; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.core.client.SessionFailureListener; +import org.apache.activemq.jms.server.ActiveMQJMSServerLogger; + +/** + * XAResourceWrapper. + * + * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module + * + * The reason why we don't use that class directly is that it assumes on failure of connection + * the RM_FAIL or RM_ERR is thrown, but in ActiveMQ we throw XA_RETRY since we want the recovery manager to be able + * to retry on failure without having to manually retry + * + * @author <a href="[email protected]">Adrian Brock</a> + * @author <a href="[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * + * @version $Revision: 45341 $ + */ +public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureListener +{ + /** The state lock */ + private static final Object lock = new Object(); + + private ServerLocator serverLocator; + + private ClientSessionFactory csf; + + private ClientSession delegate; + + private XARecoveryConfig[] xaRecoveryConfigs; + + public ActiveMQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs) + { + this.xaRecoveryConfigs = xaRecoveryConfigs; + + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) + + ", instance=" + + System.identityHashCode(this)); + } + } + + public Xid[] recover(final int flag) throws XAException + { + XAResource xaResource = getDelegate(false); + + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs)); + } + + try + { + Xid[] xids = xaResource.recover(flag); + + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0) + { + ActiveMQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this); + } + + return xids; + } + catch (XAException e) + { + ActiveMQJMSServerLogger.LOGGER.xaRecoverError(e); + throw check(e); + } + } + + public void commit(final Xid xid, final boolean onePhase) throws XAException + { + XAResource xaResource = getDelegate(true); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase); + } + try + { + xaResource.commit(xid, onePhase); + } + catch (XAException e) + { + throw check(e); + } + } + + public void rollback(final Xid xid) throws XAException + { + XAResource xaResource = getDelegate(true); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid "); + } + try + { + xaResource.rollback(xid); + } + catch (XAException e) + { + throw check(e); + } + } + + public void forget(final Xid xid) throws XAException + { + XAResource xaResource = getDelegate(false); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid "); + } + + try + { + xaResource.forget(xid); + } + catch (XAException e) + { + throw check(e); + } + } + + public boolean isSameRM(XAResource xaRes) throws XAException + { + if (xaRes instanceof ActiveMQXAResourceWrapper) + { + xaRes = ((ActiveMQXAResourceWrapper)xaRes).getDelegate(false); + } + + XAResource xaResource = getDelegate(false); + try + { + return xaResource.isSameRM(xaRes); + } + catch (XAException e) + { + throw check(e); + } + } + + public int prepare(final Xid xid) throws XAException + { + XAResource xaResource = getDelegate(true); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid "); + } + try + { + return xaResource.prepare(xid); + } + catch (XAException e) + { + throw check(e); + } + } + + public void start(final Xid xid, final int flags) throws XAException + { + XAResource xaResource = getDelegate(false); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid "); + } + try + { + xaResource.start(xid, flags); + } + catch (XAException e) + { + throw check(e); + } + } + + public void end(final Xid xid, final int flags) throws XAException + { + XAResource xaResource = getDelegate(false); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid "); + } + try + { + xaResource.end(xid, flags); + } + catch (XAException e) + { + throw check(e); + } + } + + public int getTransactionTimeout() throws XAException + { + XAResource xaResource = getDelegate(false); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid "); + } + try + { + return xaResource.getTransactionTimeout(); + } + catch (XAException e) + { + throw check(e); + } + } + + public boolean setTransactionTimeout(final int seconds) throws XAException + { + XAResource xaResource = getDelegate(false); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid "); + } + try + { + return xaResource.setTransactionTimeout(seconds); + } + catch (XAException e) + { + throw check(e); + } + } + + public void connectionFailed(final ActiveMQException me, boolean failedOver) + { + if (me.getType() == ActiveMQExceptionType.DISCONNECTED) + { + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me); + } + } + else + { + ActiveMQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf); + } + close(); + } + + @Override + public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + public void beforeReconnect(final ActiveMQException me) + { + } + + /** + * Get the connectionFactory XAResource + * + * @return the connectionFactory + * @throws XAException for any problem + */ + private XAResource getDelegate(boolean retry) throws XAException + { + XAResource result = null; + Exception error = null; + try + { + result = connect(); + } + catch (Exception e) + { + error = e; + } + + if (result == null) + { + // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and + // all chaos is let loose + if (retry) + { + XAException xae = new XAException("Connection unavailable for xa recovery"); + xae.errorCode = XAException.XA_RETRY; + if (error != null) + { + xae.initCause(error); + } + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae); + } + throw xae; + } + else + { + XAException xae = new XAException("Error trying to connect to any providers for xa recovery"); + xae.errorCode = XAException.XAER_RMERR; + if (error != null) + { + xae.initCause(error); + } + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae); + } + throw xae; + } + + } + + return result; + } + + /** + * Connect to the server if not already done so + * + * @return the connectionFactory XAResource + * @throws Exception for any problem + */ + protected XAResource connect() throws Exception + { + // Do we already have a valid connectionFactory? + synchronized (ActiveMQXAResourceWrapper.lock) + { + if (delegate != null) + { + return delegate; + } + } + + for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs) + { + + if (xaRecoveryConfig == null) + { + continue; + } + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs)); + } + + ClientSession cs = null; + + try + { + // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions + // we really need to control what server it's connected to + + // Manual configuration may still use discovery, so we will keep this + if (xaRecoveryConfig.getDiscoveryConfiguration() != null) + { + serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration()); + } + else + { + serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig()); + } + serverLocator.disableFinalizeCheck(); + csf = serverLocator.createSessionFactory(); + if (xaRecoveryConfig.getUsername() == null) + { + cs = csf.createSession(true, false, false); + } + else + { + cs = csf.createSession(xaRecoveryConfig.getUsername(), + xaRecoveryConfig.getPassword(), + true, + false, + false, + false, + 1); + } + } + catch (Throwable e) + { + ActiveMQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig); + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.debug(e.getMessage(), e); + } + + try + { + if (cs != null) cs.close(); + if (serverLocator != null) serverLocator.close(); + } + catch (Throwable ignored) + { + if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled()) + { + ActiveMQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored); + } + } + continue; + } + + cs.addFailureListener(this); + + synchronized (ActiveMQXAResourceWrapper.lock) + { + delegate = cs; + } + + return delegate; + } + ActiveMQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs)); + throw new ActiveMQNotConnectedException(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() + { + return "ActiveMQXAResourceWrapper [serverLocator=" + serverLocator + + ", csf=" + + csf + + ", delegate=" + + delegate + + ", xaRecoveryConfigs=" + + Arrays.toString(xaRecoveryConfigs) + + ", instance=" + + System.identityHashCode(this) + + "]"; + } + + /** + * Close the connection + */ + public void close() + { + ServerLocator oldServerLocator = null; + ClientSessionFactory oldCSF = null; + ClientSession oldDelegate = null; + synchronized (ActiveMQXAResourceWrapper.lock) + { + oldCSF = csf; + csf = null; + oldDelegate = delegate; + delegate = null; + oldServerLocator = serverLocator; + serverLocator = null; + } + + if (oldDelegate != null) + { + try + { + oldDelegate.close(); + } + catch (Throwable ignorable) + { + ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); + } + } + + if (oldCSF != null) + { + try + { + oldCSF.close(); + } + catch (Throwable ignorable) + { + ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); + } + } + + if (oldServerLocator != null) + { + try + { + oldServerLocator.close(); + } + catch (Throwable ignorable) + { + ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); + } + } + } + + /** + * Check whether an XAException is fatal. If it is an RM problem + * we close the connection so the next call will reconnect. + * + * @param e the xa exception + * @return never + * @throws XAException always + */ + protected XAException check(final XAException e) throws XAException + { + ActiveMQJMSServerLogger.LOGGER.xaRecoveryError(e); + + + // If any exception happened, we close the connection so we may start fresh + close(); + throw e; + } + + @Override + protected void finalize() throws Throwable + { + close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java deleted file mode 100644 index d9b5c86..0000000 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.server.recovery; - -import javax.transaction.xa.XAResource; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.activemq.api.core.Pair; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.jms.server.HornetQJMSServerLogger; -import org.jboss.tm.XAResourceRecovery; - -/** - * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p> - * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link HornetQRecoveryRegistry#register(XARecoveryConfig)}</p> - * <p>Later the {@link RecoveryDiscovery} will call {@link HornetQRecoveryRegistry#nodeUp(String, Pair, String, String)} - * so we will keep a track of nodes on the cluster - * or nodes where this server is connected to. </p> - * - * @author clebertsuconic - */ -public class HornetQRecoveryRegistry implements XAResourceRecovery -{ - - private static final HornetQRecoveryRegistry theInstance = new HornetQRecoveryRegistry(); - - private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>(); - - /** - * The list by server id and resource adapter wrapper, what will actually be calling recovery. - * This will be returned by getXAResources - */ - private final ConcurrentHashMap<String, HornetQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, HornetQXAResourceWrapper>(); - - /** - * In case of failures, we retry on the next getXAResources - */ - private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>(); - - private HornetQRecoveryRegistry() - { - } - - /** - * This will be called periodically by the Transaction Manager - */ - public XAResource[] getXAResources() - { - try - { - checkFailures(); - - HornetQXAResourceWrapper[] resourceArray = new HornetQXAResourceWrapper[recoveries.size()]; - resourceArray = recoveries.values().toArray(resourceArray); - - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("\n======================================================================================="); - HornetQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:"); - for (Map.Entry<String, HornetQXAResourceWrapper> entry : recoveries.entrySet()) - { - HornetQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue()); - } - HornetQJMSServerLogger.LOGGER.debug("=======================================================================================\n"); - } - - return resourceArray; - } - catch (Throwable e) - { - HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e); - return new XAResource[]{}; - } - } - - public static HornetQRecoveryRegistry getInstance() - { - return theInstance; - } - - /** - * This will be called by then resource adapters, to register a new discovery - * - * @param resourceConfig - */ - public void register(final XARecoveryConfig resourceConfig) - { - RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig); - RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance); - if (discoveryRecord == null) - { - discoveryRecord = newInstance; - discoveryRecord.start(false); - } - // you could have a configuration shared with multiple MDBs or RAs - discoveryRecord.incrementUsage(); - } - - /** - * Reference counts and deactivate a configuration - * Notice: this won't remove the servers since a server may have previous XIDs - * - * @param resourceConfig - */ - public void unRegister(final XARecoveryConfig resourceConfig) - { - RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig); - if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0) - { - discoveryRecord = configSet.remove(resourceConfig); - if (discoveryRecord != null) - { - discoveryRecord.stop(); - } - } - } - - /** - * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but - * maybe we should. - */ - public void stop() - { - for (RecoveryDiscovery recoveryDiscovery : configSet.values()) - { - recoveryDiscovery.stop(); - } - for (HornetQXAResourceWrapper hornetQXAResourceWrapper : recoveries.values()) - { - hornetQXAResourceWrapper.close(); - } - recoveries.clear(); - configSet.clear(); - } - - /** - * in case of a failure the Discovery will register itslef to retry - * - * @param failedDiscovery - */ - public void failedDiscovery(RecoveryDiscovery failedDiscovery) - { - HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery); - synchronized (failedDiscoverySet) - { - failedDiscoverySet.add(failedDiscovery); - } - } - - /** - * @param nodeID - * @param networkConfiguration - * @param username - * @param password - */ - public void nodeUp(String nodeID, - Pair<TransportConfiguration, TransportConfiguration> networkConfiguration, - String username, - String password) - { - - if (recoveries.get(nodeID) == null) - { - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration); - } - XARecoveryConfig config = new XARecoveryConfig(true, - extractTransportConfiguration(networkConfiguration), - username, - password); - - HornetQXAResourceWrapper wrapper = new HornetQXAResourceWrapper(config); - recoveries.putIfAbsent(nodeID, wrapper); - } - } - - public void nodeDown(String nodeID) - { - } - - /** - * this will go through the list of retries - */ - private void checkFailures() - { - final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>(); - - // it will transfer all the discoveries to a new collection - synchronized (failedDiscoverySet) - { - failures.addAll(failedDiscoverySet); - failedDiscoverySet.clear(); - } - - if (failures.size() > 0) - { - // This shouldn't happen on a regular scenario, however when this retry happens this needs - // to be done on a new thread - Thread t = new Thread("HornetQ Recovery Discovery Reinitialization") - { - @Override - public void run() - { - for (RecoveryDiscovery discovery : failures) - { - try - { - HornetQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery); - discovery.start(true); - } - catch (Throwable e) - { - HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e); - } - } - } - }; - - t.start(); - } - } - - /** - * @param networkConfiguration - * @return - */ - private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration) - { - if (networkConfiguration.getB() != null) - { - return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()}; - } - return new TransportConfiguration[]{networkConfiguration.getA()}; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java deleted file mode 100644 index c9381bb..0000000 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.server.recovery; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.jboss.tm.XAResourceRecoveryRegistry; - -/** - * This class is a base class for the integration layer where - * This class is used on integration points and this is just a bridge to the real registry at - * {@link HornetQRecoveryRegistry} - * - * @author Clebert - * - * - */ -public abstract class HornetQRegistryBase -{ - - private final AtomicBoolean started = new AtomicBoolean(false); - - public HornetQRegistryBase() - { - } - - - public abstract XAResourceRecoveryRegistry getTMRegistry(); - - public void register(final XARecoveryConfig resourceConfig) - { - init(); - HornetQRecoveryRegistry.getInstance().register(resourceConfig); - } - - - - public void unRegister(final XARecoveryConfig resourceConfig) - { - init(); - HornetQRecoveryRegistry.getInstance().unRegister(resourceConfig); - } - - public void stop() - { - if (started.compareAndSet(true, false) && getTMRegistry() != null) - { - getTMRegistry().removeXAResourceRecovery(HornetQRecoveryRegistry.getInstance()); - HornetQRecoveryRegistry.getInstance().stop(); - } - } - - private void init() - { - if (started.compareAndSet(false, true) && getTMRegistry() != null) - { - getTMRegistry().addXAResourceRecovery(HornetQRecoveryRegistry.getInstance()); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java deleted file mode 100644 index 448e6b1..0000000 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.server.recovery; - -import javax.transaction.xa.XAResource; -import java.util.HashMap; -import java.util.Map; - -import com.arjuna.ats.jta.recovery.XAResourceRecovery; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.jms.server.HornetQJMSServerLogger; - -/** - * A XAResourceRecovery instance that can be used to recover any JMS provider. - * <p> - * In reality only recover, rollback and commit will be called but we still need to be implement all - * methods just in case. - * <p> - * To enable this add the following to the jbossts-properties file - * <pre> - * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1" - * value="org.apache.activemq.jms.server.recovery.HornetQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/> - * </pre> - * <p> - * you'll need something like this if the HornetQ Server is remote - * <pre> - * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2" - * value="org.apache.activemq.jms.server.recovery.HornetQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/> - * </pre> - * <p> - * you'll need something like this if the HornetQ Server is remote and has failover configured - * <pre> - * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2" - * value="org.apache.activemq.jms.server.recovery.HornetQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/> - * </pre> - * - * @author <a href="mailto:[email protected]">Tim Fox</a> - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * @version <tt>$Revision: 1.1 $</tt> - */ -public class HornetQXAResourceRecovery implements XAResourceRecovery -{ - private final boolean trace = HornetQJMSServerLogger.LOGGER.isTraceEnabled(); - - private boolean hasMore; - - private HornetQXAResourceWrapper res; - - public HornetQXAResourceRecovery() - { - if (trace) - { - HornetQJMSServerLogger.LOGGER.trace("Constructing HornetQXAResourceRecovery"); - } - } - - public boolean initialise(final String config) - { - if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) - { - HornetQJMSServerLogger.LOGGER.trace(this + " intialise: " + config); - } - - String[] configs = config.split(";"); - XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length]; - for (int i = 0, configsLength = configs.length; i < configsLength; i++) - { - String s = configs[i]; - ConfigParser parser = new ConfigParser(s); - String connectorFactoryClassName = parser.getConnectorFactoryClassName(); - Map<String, Object> connectorParams = parser.getConnectorParameters(); - String username = parser.getUsername(); - String password = parser.getPassword(); - TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams); - xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password); - } - - - res = new HornetQXAResourceWrapper(xaRecoveryConfigs); - - if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) - { - HornetQJMSServerLogger.LOGGER.trace(this + " initialised"); - } - - return true; - } - - public boolean hasMoreResources() - { - if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) - { - HornetQJMSServerLogger.LOGGER.trace(this + " hasMoreResources"); - } - - /* - * The way hasMoreResources is supposed to work is as follows: - * For each "sweep" the recovery manager will call hasMoreResources, then if it returns - * true it will call getXAResource. - * It will repeat that until hasMoreResources returns false. - * Then the sweep is over. - * For the next sweep hasMoreResources should return true, etc. - * - * In our case where we only need to return one XAResource per sweep, - * hasMoreResources should basically alternate between true and false. - * - * - */ - - hasMore = !hasMore; - - return hasMore; - } - - public XAResource getXAResource() - { - if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) - { - HornetQJMSServerLogger.LOGGER.trace(this + " getXAResource"); - } - - return res; - } - - public XAResource[] getXAResources() - { - return new XAResource[]{res}; - } - - @Override - protected void finalize() - { - res.close(); - } - - public static class ConfigParser - { - private final String connectorFactoryClassName; - - private final Map<String, Object> connectorParameters; - - private String username; - - private String password; - - public ConfigParser(final String config) - { - if (config == null || config.length() == 0) - { - throw new IllegalArgumentException("Must specify provider connector factory class name in config"); - } - - String[] strings = config.split(","); - - // First (mandatory) param is the connector factory class name - if (strings.length < 1) - { - throw new IllegalArgumentException("Must specify provider connector factory class name in config"); - } - - connectorFactoryClassName = strings[0].trim(); - - // Next two (optional) parameters are the username and password to use for creating the session for recovery - - if (strings.length >= 2) - { - - username = strings[1].trim(); - if (username.length() == 0) - { - username = null; - } - - if (strings.length == 2) - { - throw new IllegalArgumentException("If username is specified, password must be specified too"); - } - - password = strings[2].trim(); - if (password.length() == 0) - { - password = null; - } - } - - // other tokens are for connector configurations - connectorParameters = new HashMap<String, Object>(); - if (strings.length >= 3) - { - for (int i = 3; i < strings.length; i++) - { - String[] str = strings[i].split("="); - if (str.length == 2) - { - connectorParameters.put(str[0].trim(), str[1].trim()); - } - } - } - } - - public String getConnectorFactoryClassName() - { - return connectorFactoryClassName; - } - - public Map<String, Object> getConnectorParameters() - { - return connectorParameters; - } - - public String getUsername() - { - return username; - } - - public String getPassword() - { - return password; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java deleted file mode 100644 index 293137b..0000000 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java +++ /dev/null @@ -1,531 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.server.recovery; - -import java.util.Arrays; - -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQExceptionType; -import org.apache.activemq.api.core.ActiveMQNotConnectedException; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.core.client.HornetQClient; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.core.client.SessionFailureListener; -import org.apache.activemq.jms.server.HornetQJMSServerLogger; - -/** - * XAResourceWrapper. - * - * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module - * - * The reason why we don't use that class directly is that it assumes on failure of connection - * the RM_FAIL or RM_ERR is thrown, but in HornetQ we throw XA_RETRY since we want the recovery manager to be able - * to retry on failure without having to manually retry - * - * @author <a href="[email protected]">Adrian Brock</a> - * @author <a href="[email protected]">Tim Fox</a> - * @author <a href="mailto:[email protected]">Jeff Mesnil</a> - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * - * @version $Revision: 45341 $ - */ -public class HornetQXAResourceWrapper implements XAResource, SessionFailureListener -{ - /** The state lock */ - private static final Object lock = new Object(); - - private ServerLocator serverLocator; - - private ClientSessionFactory csf; - - private ClientSession delegate; - - private XARecoveryConfig[] xaRecoveryConfigs; - - public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs) - { - this.xaRecoveryConfigs = xaRecoveryConfigs; - - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) + - ", instance=" + - System.identityHashCode(this)); - } - } - - public Xid[] recover(final int flag) throws XAException - { - XAResource xaResource = getDelegate(false); - - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs)); - } - - try - { - Xid[] xids = xaResource.recover(flag); - - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0) - { - HornetQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this); - } - - return xids; - } - catch (XAException e) - { - HornetQJMSServerLogger.LOGGER.xaRecoverError(e); - throw check(e); - } - } - - public void commit(final Xid xid, final boolean onePhase) throws XAException - { - XAResource xaResource = getDelegate(true); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase); - } - try - { - xaResource.commit(xid, onePhase); - } - catch (XAException e) - { - throw check(e); - } - } - - public void rollback(final Xid xid) throws XAException - { - XAResource xaResource = getDelegate(true); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid "); - } - try - { - xaResource.rollback(xid); - } - catch (XAException e) - { - throw check(e); - } - } - - public void forget(final Xid xid) throws XAException - { - XAResource xaResource = getDelegate(false); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid "); - } - - try - { - xaResource.forget(xid); - } - catch (XAException e) - { - throw check(e); - } - } - - public boolean isSameRM(XAResource xaRes) throws XAException - { - if (xaRes instanceof HornetQXAResourceWrapper) - { - xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false); - } - - XAResource xaResource = getDelegate(false); - try - { - return xaResource.isSameRM(xaRes); - } - catch (XAException e) - { - throw check(e); - } - } - - public int prepare(final Xid xid) throws XAException - { - XAResource xaResource = getDelegate(true); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid "); - } - try - { - return xaResource.prepare(xid); - } - catch (XAException e) - { - throw check(e); - } - } - - public void start(final Xid xid, final int flags) throws XAException - { - XAResource xaResource = getDelegate(false); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid "); - } - try - { - xaResource.start(xid, flags); - } - catch (XAException e) - { - throw check(e); - } - } - - public void end(final Xid xid, final int flags) throws XAException - { - XAResource xaResource = getDelegate(false); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid "); - } - try - { - xaResource.end(xid, flags); - } - catch (XAException e) - { - throw check(e); - } - } - - public int getTransactionTimeout() throws XAException - { - XAResource xaResource = getDelegate(false); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid "); - } - try - { - return xaResource.getTransactionTimeout(); - } - catch (XAException e) - { - throw check(e); - } - } - - public boolean setTransactionTimeout(final int seconds) throws XAException - { - XAResource xaResource = getDelegate(false); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid "); - } - try - { - return xaResource.setTransactionTimeout(seconds); - } - catch (XAException e) - { - throw check(e); - } - } - - public void connectionFailed(final ActiveMQException me, boolean failedOver) - { - if (me.getType() == ActiveMQExceptionType.DISCONNECTED) - { - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me); - } - } - else - { - HornetQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf); - } - close(); - } - - @Override - public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) - { - connectionFailed(me, failedOver); - } - - public void beforeReconnect(final ActiveMQException me) - { - } - - /** - * Get the connectionFactory XAResource - * - * @return the connectionFactory - * @throws XAException for any problem - */ - private XAResource getDelegate(boolean retry) throws XAException - { - XAResource result = null; - Exception error = null; - try - { - result = connect(); - } - catch (Exception e) - { - error = e; - } - - if (result == null) - { - // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and - // all chaos is let loose - if (retry) - { - XAException xae = new XAException("Connection unavailable for xa recovery"); - xae.errorCode = XAException.XA_RETRY; - if (error != null) - { - xae.initCause(error); - } - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae); - } - throw xae; - } - else - { - XAException xae = new XAException("Error trying to connect to any providers for xa recovery"); - xae.errorCode = XAException.XAER_RMERR; - if (error != null) - { - xae.initCause(error); - } - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae); - } - throw xae; - } - - } - - return result; - } - - /** - * Connect to the server if not already done so - * - * @return the connectionFactory XAResource - * @throws Exception for any problem - */ - protected XAResource connect() throws Exception - { - // Do we already have a valid connectionFactory? - synchronized (HornetQXAResourceWrapper.lock) - { - if (delegate != null) - { - return delegate; - } - } - - for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs) - { - - if (xaRecoveryConfig == null) - { - continue; - } - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs)); - } - - ClientSession cs = null; - - try - { - // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions - // we really need to control what server it's connected to - - // Manual configuration may still use discovery, so we will keep this - if (xaRecoveryConfig.getDiscoveryConfiguration() != null) - { - serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration()); - } - else - { - serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig()); - } - serverLocator.disableFinalizeCheck(); - csf = serverLocator.createSessionFactory(); - if (xaRecoveryConfig.getUsername() == null) - { - cs = csf.createSession(true, false, false); - } - else - { - cs = csf.createSession(xaRecoveryConfig.getUsername(), - xaRecoveryConfig.getPassword(), - true, - false, - false, - false, - 1); - } - } - catch (Throwable e) - { - HornetQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig); - if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) - { - HornetQJMSServerLogger.LOGGER.debug(e.getMessage(), e); - } - - try - { - if (cs != null) cs.close(); - if (serverLocator != null) serverLocator.close(); - } - catch (Throwable ignored) - { - if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) - { - HornetQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored); - } - } - continue; - } - - cs.addFailureListener(this); - - synchronized (HornetQXAResourceWrapper.lock) - { - delegate = cs; - } - - return delegate; - } - HornetQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs)); - throw new ActiveMQNotConnectedException(); - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() - { - return "HornetQXAResourceWrapper [serverLocator=" + serverLocator + - ", csf=" + - csf + - ", delegate=" + - delegate + - ", xaRecoveryConfigs=" + - Arrays.toString(xaRecoveryConfigs) + - ", instance=" + - System.identityHashCode(this) + - "]"; - } - - /** - * Close the connection - */ - public void close() - { - ServerLocator oldServerLocator = null; - ClientSessionFactory oldCSF = null; - ClientSession oldDelegate = null; - synchronized (HornetQXAResourceWrapper.lock) - { - oldCSF = csf; - csf = null; - oldDelegate = delegate; - delegate = null; - oldServerLocator = serverLocator; - serverLocator = null; - } - - if (oldDelegate != null) - { - try - { - oldDelegate.close(); - } - catch (Throwable ignorable) - { - HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); - } - } - - if (oldCSF != null) - { - try - { - oldCSF.close(); - } - catch (Throwable ignorable) - { - HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); - } - } - - if (oldServerLocator != null) - { - try - { - oldServerLocator.close(); - } - catch (Throwable ignorable) - { - HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); - } - } - } - - /** - * Check whether an XAException is fatal. If it is an RM problem - * we close the connection so the next call will reconnect. - * - * @param e the xa exception - * @return never - * @throws XAException always - */ - protected XAException check(final XAException e) throws XAException - { - HornetQJMSServerLogger.LOGGER.xaRecoveryError(e); - - - // If any exception happened, we close the connection so we may start fresh - close(); - throw e; - } - - @Override - protected void finalize() throws Throwable - { - close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java index 7510361..b705db9 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java @@ -23,12 +23,12 @@ import org.apache.activemq.api.core.client.ServerLocator; import org.apache.activemq.api.core.client.SessionFailureListener; import org.apache.activemq.api.core.client.TopologyMember; import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal; -import org.apache.activemq.jms.server.HornetQJMSServerLogger; +import org.apache.activemq.jms.server.ActiveMQJMSServerLogger; /** * <p>This class will have a simple Connection Factory and will listen * for topology updates. </p> - * <p>This Discovery is instantiated by {@link HornetQRecoveryRegistry} + * <p>This Discovery is instantiated by {@link ActiveMQRecoveryRegistry} * * @author clebertsuconic */ @@ -51,7 +51,7 @@ public class RecoveryDiscovery implements SessionFailureListener { if (!started) { - HornetQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config); + ActiveMQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config); started = true; locator = config.createServerLocator(); @@ -65,16 +65,16 @@ public class RecoveryDiscovery implements SessionFailureListener // in case of failure we will retry sessionFactory.addFailureListener(this); - HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config); + ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config); } catch (Exception startupError) { if (!retry) { - HornetQJMSServerLogger.LOGGER.xaRecoveryStartError(config); + ActiveMQJMSServerLogger.LOGGER.xaRecoveryStartError(config); } stop(); - HornetQRecoveryRegistry.getInstance().failedDiscovery(this); + ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this); } } @@ -123,7 +123,7 @@ public class RecoveryDiscovery implements SessionFailureListener } catch (Exception ignored) { - HornetQJMSServerLogger.LOGGER.debug(ignored, ignored); + ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored); } try @@ -132,7 +132,7 @@ public class RecoveryDiscovery implements SessionFailureListener } catch (Exception ignored) { - HornetQJMSServerLogger.LOGGER.debug(ignored, ignored); + ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored); } sessionFactory = null; @@ -160,7 +160,7 @@ public class RecoveryDiscovery implements SessionFailureListener Pair<TransportConfiguration, TransportConfiguration> connector = new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(), topologyMember.getBackup()); - HornetQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector, + ActiveMQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector, config.getUsername(), config.getPassword()); } } @@ -180,15 +180,15 @@ public class RecoveryDiscovery implements SessionFailureListener { if (exception.getType() == ActiveMQExceptionType.DISCONNECTED) { - HornetQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception); + ActiveMQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception); } else { - HornetQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery", + ActiveMQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery", exception); } internalStop(); - HornetQRecoveryRegistry.getInstance().failedDiscovery(this); + ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this); } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java index 6c00917..24e40c0 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java @@ -16,9 +16,9 @@ import java.util.Arrays; import org.apache.activemq.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.HornetQClient; +import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.jms.client.HornetQConnectionFactory; +import org.apache.activemq.jms.client.ActiveMQConnectionFactory; /** * @@ -40,7 +40,7 @@ public class XARecoveryConfig private final String username; private final String password; - public static XARecoveryConfig newConfig(HornetQConnectionFactory factory, + public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password) { @@ -107,11 +107,11 @@ public class XARecoveryConfig { if (getDiscoveryConfiguration() != null) { - return HornetQClient.createServerLocator(isHA(), getDiscoveryConfiguration()); + return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()); } else { - return HornetQClient.createServerLocator(isHA(), getTransportConfig()); + return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java index 7d0ae6e..a44d114 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java @@ -12,10 +12,10 @@ */ /** * This package is used to locate resources and connectors along the cluster set - * I - JCA Connection Factories or InBound MDBs will call HornetQRegistryBase::register(XARecoveryConfig) + * I - JCA Connection Factories or InBound MDBs will call ActiveMQRegistryBase::register(XARecoveryConfig) * II - For each XARecoveryConfig the RegistryBase will instantiate a ResourceDiscoveryUnit which will * connect using that configuration and inform the Registry of any topology members - * III - For each topology member found on the DiscoveryUnits, the RegistryBase will registry a HornetQResourceRecovery + * III - For each topology member found on the DiscoveryUnits, the RegistryBase will registry a ActiveMQResourceRecovery * that will exist per server */ package org.apache.activemq.jms.server.recovery;
