http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java new file mode 100644 index 0000000..094c4be --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java @@ -0,0 +1,250 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.recovery; + +import javax.transaction.xa.XAResource; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.jms.server.HornetQJMSServerLogger; +import org.jboss.tm.XAResourceRecovery; + +/** + * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p> + * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link HornetQRecoveryRegistry#register(XARecoveryConfig)}</p> + * <p>Later the {@link RecoveryDiscovery} will call {@link HornetQRecoveryRegistry#nodeUp(String, Pair, String, String)} + * so we will keep a track of nodes on the cluster + * or nodes where this server is connected to. </p> + * + * @author clebertsuconic + */ +public class HornetQRecoveryRegistry implements XAResourceRecovery +{ + + private static final HornetQRecoveryRegistry theInstance = new HornetQRecoveryRegistry(); + + private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>(); + + /** + * The list by server id and resource adapter wrapper, what will actually be calling recovery. + * This will be returned by getXAResources + */ + private final ConcurrentHashMap<String, HornetQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, HornetQXAResourceWrapper>(); + + /** + * In case of failures, we retry on the next getXAResources + */ + private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>(); + + private HornetQRecoveryRegistry() + { + } + + /** + * This will be called periodically by the Transaction Manager + */ + public XAResource[] getXAResources() + { + try + { + checkFailures(); + + HornetQXAResourceWrapper[] resourceArray = new HornetQXAResourceWrapper[recoveries.size()]; + resourceArray = recoveries.values().toArray(resourceArray); + + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("\n======================================================================================="); + HornetQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:"); + for (Map.Entry<String, HornetQXAResourceWrapper> entry : recoveries.entrySet()) + { + HornetQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue()); + } + HornetQJMSServerLogger.LOGGER.debug("=======================================================================================\n"); + } + + return resourceArray; + } + catch (Throwable e) + { + HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e); + return new XAResource[]{}; + } + } + + public static HornetQRecoveryRegistry getInstance() + { + return theInstance; + } + + /** + * This will be called by then resource adapters, to register a new discovery + * + * @param resourceConfig + */ + public void register(final XARecoveryConfig resourceConfig) + { + RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig); + RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance); + if (discoveryRecord == null) + { + discoveryRecord = newInstance; + discoveryRecord.start(false); + } + // you could have a configuration shared with multiple MDBs or RAs + discoveryRecord.incrementUsage(); + } + + /** + * Reference counts and deactivate a configuration + * Notice: this won't remove the servers since a server may have previous XIDs + * + * @param resourceConfig + */ + public void unRegister(final XARecoveryConfig resourceConfig) + { + RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig); + if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0) + { + discoveryRecord = configSet.remove(resourceConfig); + if (discoveryRecord != null) + { + discoveryRecord.stop(); + } + } + } + + /** + * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but + * maybe we should. + */ + public void stop() + { + for (RecoveryDiscovery recoveryDiscovery : configSet.values()) + { + recoveryDiscovery.stop(); + } + for (HornetQXAResourceWrapper hornetQXAResourceWrapper : recoveries.values()) + { + hornetQXAResourceWrapper.close(); + } + recoveries.clear(); + configSet.clear(); + } + + /** + * in case of a failure the Discovery will register itslef to retry + * + * @param failedDiscovery + */ + public void failedDiscovery(RecoveryDiscovery failedDiscovery) + { + HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery); + synchronized (failedDiscoverySet) + { + failedDiscoverySet.add(failedDiscovery); + } + } + + /** + * @param nodeID + * @param networkConfiguration + * @param username + * @param password + */ + public void nodeUp(String nodeID, + Pair<TransportConfiguration, TransportConfiguration> networkConfiguration, + String username, + String password) + { + + if (recoveries.get(nodeID) == null) + { + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration); + } + XARecoveryConfig config = new XARecoveryConfig(true, + extractTransportConfiguration(networkConfiguration), + username, + password); + + HornetQXAResourceWrapper wrapper = new HornetQXAResourceWrapper(config); + recoveries.putIfAbsent(nodeID, wrapper); + } + } + + public void nodeDown(String nodeID) + { + } + + /** + * this will go through the list of retries + */ + private void checkFailures() + { + final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>(); + + // it will transfer all the discoveries to a new collection + synchronized (failedDiscoverySet) + { + failures.addAll(failedDiscoverySet); + failedDiscoverySet.clear(); + } + + if (failures.size() > 0) + { + // This shouldn't happen on a regular scenario, however when this retry happens this needs + // to be done on a new thread + Thread t = new Thread("HornetQ Recovery Discovery Reinitialization") + { + @Override + public void run() + { + for (RecoveryDiscovery discovery : failures) + { + try + { + HornetQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery); + discovery.start(true); + } + catch (Throwable e) + { + HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + }; + + t.start(); + } + } + + /** + * @param networkConfiguration + * @return + */ + private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration) + { + if (networkConfiguration.getB() != null) + { + return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()}; + } + return new TransportConfiguration[]{networkConfiguration.getA()}; + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java new file mode 100644 index 0000000..a0b7f5e --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java @@ -0,0 +1,71 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.recovery; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jboss.tm.XAResourceRecoveryRegistry; + +/** + * This class is a base class for the integration layer where + * This class is used on integration points and this is just a bridge to the real registry at + * {@link HornetQRecoveryRegistry} + * + * @author Clebert + * + * + */ +public abstract class HornetQRegistryBase +{ + + private final AtomicBoolean started = new AtomicBoolean(false); + + public HornetQRegistryBase() + { + } + + + public abstract XAResourceRecoveryRegistry getTMRegistry(); + + public void register(final XARecoveryConfig resourceConfig) + { + init(); + HornetQRecoveryRegistry.getInstance().register(resourceConfig); + } + + + + public void unRegister(final XARecoveryConfig resourceConfig) + { + init(); + HornetQRecoveryRegistry.getInstance().unRegister(resourceConfig); + } + + public void stop() + { + if (started.compareAndSet(true, false) && getTMRegistry() != null) + { + getTMRegistry().removeXAResourceRecovery(HornetQRecoveryRegistry.getInstance()); + HornetQRecoveryRegistry.getInstance().stop(); + } + } + + private void init() + { + if (started.compareAndSet(false, true) && getTMRegistry() != null) + { + getTMRegistry().addXAResourceRecovery(HornetQRecoveryRegistry.getInstance()); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java new file mode 100644 index 0000000..947ddb6 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java @@ -0,0 +1,231 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.recovery; + +import javax.transaction.xa.XAResource; +import java.util.HashMap; +import java.util.Map; + +import com.arjuna.ats.jta.recovery.XAResourceRecovery; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.jms.server.HornetQJMSServerLogger; + +/** + * A XAResourceRecovery instance that can be used to recover any JMS provider. + * <p> + * In reality only recover, rollback and commit will be called but we still need to be implement all + * methods just in case. + * <p> + * To enable this add the following to the jbossts-properties file + * <pre> + * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1" + * value="org.apache.activemq6.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/> + * </pre> + * <p> + * you'll need something like this if the HornetQ Server is remote + * <pre> + * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2" + * value="org.apache.activemq6.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/> + * </pre> + * <p> + * you'll need something like this if the HornetQ Server is remote and has failover configured + * <pre> + * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2" + * value="org.apache.activemq6.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/> + * </pre> + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version <tt>$Revision: 1.1 $</tt> + */ +public class HornetQXAResourceRecovery implements XAResourceRecovery +{ + private final boolean trace = HornetQJMSServerLogger.LOGGER.isTraceEnabled(); + + private boolean hasMore; + + private HornetQXAResourceWrapper res; + + public HornetQXAResourceRecovery() + { + if (trace) + { + HornetQJMSServerLogger.LOGGER.trace("Constructing HornetQXAResourceRecovery"); + } + } + + public boolean initialise(final String config) + { + if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) + { + HornetQJMSServerLogger.LOGGER.trace(this + " intialise: " + config); + } + + String[] configs = config.split(";"); + XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length]; + for (int i = 0, configsLength = configs.length; i < configsLength; i++) + { + String s = configs[i]; + ConfigParser parser = new ConfigParser(s); + String connectorFactoryClassName = parser.getConnectorFactoryClassName(); + Map<String, Object> connectorParams = parser.getConnectorParameters(); + String username = parser.getUsername(); + String password = parser.getPassword(); + TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams); + xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password); + } + + + res = new HornetQXAResourceWrapper(xaRecoveryConfigs); + + if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) + { + HornetQJMSServerLogger.LOGGER.trace(this + " initialised"); + } + + return true; + } + + public boolean hasMoreResources() + { + if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) + { + HornetQJMSServerLogger.LOGGER.trace(this + " hasMoreResources"); + } + + /* + * The way hasMoreResources is supposed to work is as follows: + * For each "sweep" the recovery manager will call hasMoreResources, then if it returns + * true it will call getXAResource. + * It will repeat that until hasMoreResources returns false. + * Then the sweep is over. + * For the next sweep hasMoreResources should return true, etc. + * + * In our case where we only need to return one XAResource per sweep, + * hasMoreResources should basically alternate between true and false. + * + * + */ + + hasMore = !hasMore; + + return hasMore; + } + + public XAResource getXAResource() + { + if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) + { + HornetQJMSServerLogger.LOGGER.trace(this + " getXAResource"); + } + + return res; + } + + public XAResource[] getXAResources() + { + return new XAResource[]{res}; + } + + @Override + protected void finalize() + { + res.close(); + } + + public static class ConfigParser + { + private final String connectorFactoryClassName; + + private final Map<String, Object> connectorParameters; + + private String username; + + private String password; + + public ConfigParser(final String config) + { + if (config == null || config.length() == 0) + { + throw new IllegalArgumentException("Must specify provider connector factory class name in config"); + } + + String[] strings = config.split(","); + + // First (mandatory) param is the connector factory class name + if (strings.length < 1) + { + throw new IllegalArgumentException("Must specify provider connector factory class name in config"); + } + + connectorFactoryClassName = strings[0].trim(); + + // Next two (optional) parameters are the username and password to use for creating the session for recovery + + if (strings.length >= 2) + { + + username = strings[1].trim(); + if (username.length() == 0) + { + username = null; + } + + if (strings.length == 2) + { + throw new IllegalArgumentException("If username is specified, password must be specified too"); + } + + password = strings[2].trim(); + if (password.length() == 0) + { + password = null; + } + } + + // other tokens are for connector configurations + connectorParameters = new HashMap<String, Object>(); + if (strings.length >= 3) + { + for (int i = 3; i < strings.length; i++) + { + String[] str = strings[i].split("="); + if (str.length == 2) + { + connectorParameters.put(str[0].trim(), str[1].trim()); + } + } + } + } + + public String getConnectorFactoryClassName() + { + return connectorFactoryClassName; + } + + public Map<String, Object> getConnectorParameters() + { + return connectorParameters; + } + + public String getUsername() + { + return username; + } + + public String getPassword() + { + return password; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java new file mode 100644 index 0000000..c818362 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java @@ -0,0 +1,531 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.recovery; + +import java.util.Arrays; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.HornetQNotConnectedException; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.jms.server.HornetQJMSServerLogger; + +/** + * XAResourceWrapper. + * + * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module + * + * The reason why we don't use that class directly is that it assumes on failure of connection + * the RM_FAIL or RM_ERR is thrown, but in HornetQ we throw XA_RETRY since we want the recovery manager to be able + * to retry on failure without having to manually retry + * + * @author <a href="[email protected]">Adrian Brock</a> + * @author <a href="[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * + * @version $Revision: 45341 $ + */ +public class HornetQXAResourceWrapper implements XAResource, SessionFailureListener +{ + /** The state lock */ + private static final Object lock = new Object(); + + private ServerLocator serverLocator; + + private ClientSessionFactory csf; + + private ClientSession delegate; + + private XARecoveryConfig[] xaRecoveryConfigs; + + public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs) + { + this.xaRecoveryConfigs = xaRecoveryConfigs; + + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) + + ", instance=" + + System.identityHashCode(this)); + } + } + + public Xid[] recover(final int flag) throws XAException + { + XAResource xaResource = getDelegate(false); + + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs)); + } + + try + { + Xid[] xids = xaResource.recover(flag); + + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0) + { + HornetQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this); + } + + return xids; + } + catch (XAException e) + { + HornetQJMSServerLogger.LOGGER.xaRecoverError(e); + throw check(e); + } + } + + public void commit(final Xid xid, final boolean onePhase) throws XAException + { + XAResource xaResource = getDelegate(true); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase); + } + try + { + xaResource.commit(xid, onePhase); + } + catch (XAException e) + { + throw check(e); + } + } + + public void rollback(final Xid xid) throws XAException + { + XAResource xaResource = getDelegate(true); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid "); + } + try + { + xaResource.rollback(xid); + } + catch (XAException e) + { + throw check(e); + } + } + + public void forget(final Xid xid) throws XAException + { + XAResource xaResource = getDelegate(false); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid "); + } + + try + { + xaResource.forget(xid); + } + catch (XAException e) + { + throw check(e); + } + } + + public boolean isSameRM(XAResource xaRes) throws XAException + { + if (xaRes instanceof HornetQXAResourceWrapper) + { + xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false); + } + + XAResource xaResource = getDelegate(false); + try + { + return xaResource.isSameRM(xaRes); + } + catch (XAException e) + { + throw check(e); + } + } + + public int prepare(final Xid xid) throws XAException + { + XAResource xaResource = getDelegate(true); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid "); + } + try + { + return xaResource.prepare(xid); + } + catch (XAException e) + { + throw check(e); + } + } + + public void start(final Xid xid, final int flags) throws XAException + { + XAResource xaResource = getDelegate(false); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid "); + } + try + { + xaResource.start(xid, flags); + } + catch (XAException e) + { + throw check(e); + } + } + + public void end(final Xid xid, final int flags) throws XAException + { + XAResource xaResource = getDelegate(false); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid "); + } + try + { + xaResource.end(xid, flags); + } + catch (XAException e) + { + throw check(e); + } + } + + public int getTransactionTimeout() throws XAException + { + XAResource xaResource = getDelegate(false); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid "); + } + try + { + return xaResource.getTransactionTimeout(); + } + catch (XAException e) + { + throw check(e); + } + } + + public boolean setTransactionTimeout(final int seconds) throws XAException + { + XAResource xaResource = getDelegate(false); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid "); + } + try + { + return xaResource.setTransactionTimeout(seconds); + } + catch (XAException e) + { + throw check(e); + } + } + + public void connectionFailed(final HornetQException me, boolean failedOver) + { + if (me.getType() == HornetQExceptionType.DISCONNECTED) + { + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me); + } + } + else + { + HornetQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf); + } + close(); + } + + @Override + public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + public void beforeReconnect(final HornetQException me) + { + } + + /** + * Get the connectionFactory XAResource + * + * @return the connectionFactory + * @throws XAException for any problem + */ + private XAResource getDelegate(boolean retry) throws XAException + { + XAResource result = null; + Exception error = null; + try + { + result = connect(); + } + catch (Exception e) + { + error = e; + } + + if (result == null) + { + // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and + // all chaos is let loose + if (retry) + { + XAException xae = new XAException("Connection unavailable for xa recovery"); + xae.errorCode = XAException.XA_RETRY; + if (error != null) + { + xae.initCause(error); + } + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae); + } + throw xae; + } + else + { + XAException xae = new XAException("Error trying to connect to any providers for xa recovery"); + xae.errorCode = XAException.XAER_RMERR; + if (error != null) + { + xae.initCause(error); + } + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae); + } + throw xae; + } + + } + + return result; + } + + /** + * Connect to the server if not already done so + * + * @return the connectionFactory XAResource + * @throws Exception for any problem + */ + protected XAResource connect() throws Exception + { + // Do we already have a valid connectionFactory? + synchronized (HornetQXAResourceWrapper.lock) + { + if (delegate != null) + { + return delegate; + } + } + + for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs) + { + + if (xaRecoveryConfig == null) + { + continue; + } + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs)); + } + + ClientSession cs = null; + + try + { + // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions + // we really need to control what server it's connected to + + // Manual configuration may still use discovery, so we will keep this + if (xaRecoveryConfig.getDiscoveryConfiguration() != null) + { + serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration()); + } + else + { + serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig()); + } + serverLocator.disableFinalizeCheck(); + csf = serverLocator.createSessionFactory(); + if (xaRecoveryConfig.getUsername() == null) + { + cs = csf.createSession(true, false, false); + } + else + { + cs = csf.createSession(xaRecoveryConfig.getUsername(), + xaRecoveryConfig.getPassword(), + true, + false, + false, + false, + 1); + } + } + catch (Throwable e) + { + HornetQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig); + if (HornetQJMSServerLogger.LOGGER.isDebugEnabled()) + { + HornetQJMSServerLogger.LOGGER.debug(e.getMessage(), e); + } + + try + { + if (cs != null) cs.close(); + if (serverLocator != null) serverLocator.close(); + } + catch (Throwable ignored) + { + if (HornetQJMSServerLogger.LOGGER.isTraceEnabled()) + { + HornetQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored); + } + } + continue; + } + + cs.addFailureListener(this); + + synchronized (HornetQXAResourceWrapper.lock) + { + delegate = cs; + } + + return delegate; + } + HornetQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs)); + throw new HornetQNotConnectedException(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() + { + return "HornetQXAResourceWrapper [serverLocator=" + serverLocator + + ", csf=" + + csf + + ", delegate=" + + delegate + + ", xaRecoveryConfigs=" + + Arrays.toString(xaRecoveryConfigs) + + ", instance=" + + System.identityHashCode(this) + + "]"; + } + + /** + * Close the connection + */ + public void close() + { + ServerLocator oldServerLocator = null; + ClientSessionFactory oldCSF = null; + ClientSession oldDelegate = null; + synchronized (HornetQXAResourceWrapper.lock) + { + oldCSF = csf; + csf = null; + oldDelegate = delegate; + delegate = null; + oldServerLocator = serverLocator; + serverLocator = null; + } + + if (oldDelegate != null) + { + try + { + oldDelegate.close(); + } + catch (Throwable ignorable) + { + HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); + } + } + + if (oldCSF != null) + { + try + { + oldCSF.close(); + } + catch (Throwable ignorable) + { + HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); + } + } + + if (oldServerLocator != null) + { + try + { + oldServerLocator.close(); + } + catch (Throwable ignorable) + { + HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable); + } + } + } + + /** + * Check whether an XAException is fatal. If it is an RM problem + * we close the connection so the next call will reconnect. + * + * @param e the xa exception + * @return never + * @throws XAException always + */ + protected XAException check(final XAException e) throws XAException + { + HornetQJMSServerLogger.LOGGER.xaRecoveryError(e); + + + // If any exception happened, we close the connection so we may start fresh + close(); + throw e; + } + + @Override + protected void finalize() throws Throwable + { + close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java new file mode 100644 index 0000000..4291a53 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java @@ -0,0 +1,232 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.recovery; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClusterTopologyListener; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.api.core.client.TopologyMember; +import org.apache.activemq6.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq6.jms.server.HornetQJMSServerLogger; + +/** + * <p>This class will have a simple Connection Factory and will listen + * for topology updates. </p> + * <p>This Discovery is instantiated by {@link HornetQRecoveryRegistry} + * + * @author clebertsuconic + */ +public class RecoveryDiscovery implements SessionFailureListener +{ + + private ServerLocator locator; + private ClientSessionFactoryInternal sessionFactory; + private final XARecoveryConfig config; + private final AtomicInteger usage = new AtomicInteger(0); + private boolean started = false; + + + public RecoveryDiscovery(XARecoveryConfig config) + { + this.config = config; + } + + public synchronized void start(boolean retry) + { + if (!started) + { + HornetQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config); + started = true; + + locator = config.createServerLocator(); + locator.disableFinalizeCheck(); + locator.addClusterTopologyListener(new InternalListener(config)); + try + { + sessionFactory = (ClientSessionFactoryInternal) locator.createSessionFactory(); + // We are using the SessionFactoryInternal here directly as we don't have information to connect with an user and password + // on the session as all we want here is to get the topology + // in case of failure we will retry + sessionFactory.addFailureListener(this); + + HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config); + } + catch (Exception startupError) + { + if (!retry) + { + HornetQJMSServerLogger.LOGGER.xaRecoveryStartError(config); + } + stop(); + HornetQRecoveryRegistry.getInstance().failedDiscovery(this); + } + + } + } + + public synchronized void stop() + { + internalStop(); + } + + /** + * we may have several connection factories referencing the same connection recovery entry. + * Because of that we need to make a count of the number of the instances that are referencing it, + * so we will remove it as soon as we are done + */ + public int incrementUsage() + { + return usage.decrementAndGet(); + } + + public int decrementUsage() + { + return usage.incrementAndGet(); + } + + + @Override + protected void finalize() + { + // I don't think it's a good thing to synchronize a method on a finalize, + // hence the internalStop (no sync) call here + internalStop(); + } + + protected void internalStop() + { + if (started) + { + started = false; + try + { + if (sessionFactory != null) + { + sessionFactory.close(); + } + } + catch (Exception ignored) + { + HornetQJMSServerLogger.LOGGER.debug(ignored, ignored); + } + + try + { + locator.close(); + } + catch (Exception ignored) + { + HornetQJMSServerLogger.LOGGER.debug(ignored, ignored); + } + + sessionFactory = null; + locator = null; + } + } + + + static final class InternalListener implements ClusterTopologyListener + { + private final XARecoveryConfig config; + + public InternalListener(final XARecoveryConfig config) + { + this.config = config; + } + + @Override + public void nodeUP(TopologyMember topologyMember, boolean last) + { + // There is a case where the backup announce itself, + // we need to ignore a case where getLive is null + if (topologyMember.getLive() != null) + { + Pair<TransportConfiguration, TransportConfiguration> connector = + new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(), + topologyMember.getBackup()); + HornetQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector, + config.getUsername(), config.getPassword()); + } + } + + @Override + public void nodeDown(long eventUID, String nodeID) + { + // I'm not putting any node down, since it may have previous transactions hanging, however at some point we may + //change it have some sort of timeout for removal + } + + } + + + @Override + public void connectionFailed(HornetQException exception, boolean failedOver) + { + if (exception.getType() == HornetQExceptionType.DISCONNECTED) + { + HornetQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception); + } + else + { + HornetQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery", + exception); + } + internalStop(); + HornetQRecoveryRegistry.getInstance().failedDiscovery(this); + } + + @Override + public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + @Override + public void beforeReconnect(HornetQException exception) + { + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() + { + return "RecoveryDiscovery [config=" + config + ", started=" + started + "]"; + } + + @Override + public int hashCode() + { + return config.hashCode(); + } + + @Override + public boolean equals(Object o) + { + if (o == null || (!(o instanceof RecoveryDiscovery))) + { + return false; + } + RecoveryDiscovery discovery = (RecoveryDiscovery) o; + + return config.equals(discovery.config); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java new file mode 100644 index 0000000..7319f2b --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java @@ -0,0 +1,167 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.recovery; + +import java.util.Arrays; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.jms.client.HornetQConnectionFactory; + +/** + * + * This represents the configuration of a single connection factory. + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author Clebert Suconic + * + * A wrapper around info needed for the xa recovery resource + * Date: 3/23/11 + * Time: 10:15 AM + */ +public class XARecoveryConfig +{ + + private final boolean ha; + private final TransportConfiguration[] transportConfiguration; + private final DiscoveryGroupConfiguration discoveryConfiguration; + private final String username; + private final String password; + + public static XARecoveryConfig newConfig(HornetQConnectionFactory factory, + String userName, + String password) + { + if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) + { + return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password); + } + else + { + return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password); + } + + } + + public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password) + { + this.transportConfiguration = transportConfiguration; + this.discoveryConfiguration = null; + this.username = username; + this.password = password; + this.ha = ha; + } + + public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password) + { + this.discoveryConfiguration = discoveryConfiguration; + this.transportConfiguration = null; + this.username = username; + this.password = password; + this.ha = ha; + } + + public boolean isHA() + { + return ha; + } + + public DiscoveryGroupConfiguration getDiscoveryConfiguration() + { + return discoveryConfiguration; + } + + public TransportConfiguration[] getTransportConfig() + { + return transportConfiguration; + } + + public String getUsername() + { + return username; + } + + public String getPassword() + { + return password; + } + + + /** + * Create a serverLocator using the configuration + * @return locator + */ + public ServerLocator createServerLocator() + { + if (getDiscoveryConfiguration() != null) + { + return HornetQClient.createServerLocator(isHA(), getDiscoveryConfiguration()); + } + else + { + return HornetQClient.createServerLocator(isHA(), getTransportConfig()); + } + + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode()); + result = prime * result + Arrays.hashCode(transportConfiguration); + return result; + } + + /* + * We don't use username and password on purpose. + * Just having the connector is enough, as we don't want to duplicate resources just because of usernames + */ + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + XARecoveryConfig other = (XARecoveryConfig)obj; + if (discoveryConfiguration == null) + { + if (other.discoveryConfiguration != null) + return false; + } + else if (!discoveryConfiguration.equals(other.discoveryConfiguration)) + return false; + if (!Arrays.equals(transportConfiguration, other.transportConfiguration)) + return false; + return true; + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() + { + return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) + + ", discoveryConfiguration = " + discoveryConfiguration + + ", username=" + + username + + ", password=****]"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java new file mode 100644 index 0000000..efcf98a --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * This package is used to locate resources and connectors along the cluster set + * I - JCA Connection Factories or InBound MDBs will call HornetQRegistryBase::register(XARecoveryConfig) + * II - For each XARecoveryConfig the RegistryBase will instantiate a ResourceDiscoveryUnit which will + * connect using that configuration and inform the Registry of any topology members + * III - For each topology member found on the DiscoveryUnits, the RegistryBase will registry a HornetQResourceRecovery + * that will exist per server + */ +package org.apache.activemq6.jms.server.recovery; + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java new file mode 100644 index 0000000..d3ef9b1 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java @@ -0,0 +1,78 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.transaction; + +import java.util.Map; + +import javax.transaction.xa.Xid; + +import org.apache.activemq6.core.server.ServerMessage; +import org.apache.activemq6.core.transaction.Transaction; +import org.apache.activemq6.core.transaction.TransactionDetail; +import org.apache.activemq6.jms.client.HornetQBytesMessage; +import org.apache.activemq6.jms.client.HornetQMapMessage; +import org.apache.activemq6.jms.client.HornetQMessage; +import org.apache.activemq6.jms.client.HornetQObjectMessage; +import org.apache.activemq6.jms.client.HornetQStreamMessage; +import org.apache.activemq6.jms.client.HornetQTextMessage; + +/** + * A JMSTransactionDetail + * + * @author <a href="[email protected]">Tomohisa Igarashi</a> + * + * + */ +public class JMSTransactionDetail extends TransactionDetail +{ + public JMSTransactionDetail(Xid xid, Transaction tx, Long creation) throws Exception + { + super(xid,tx,creation); + } + + @Override + public String decodeMessageType(ServerMessage msg) + { + int type = msg.getType(); + switch (type) + { + case HornetQMessage.TYPE: // 0 + return "Default"; + case HornetQObjectMessage.TYPE: // 2 + return "ObjectMessage"; + case HornetQTextMessage.TYPE: // 3 + return "TextMessage"; + case HornetQBytesMessage.TYPE: // 4 + return "ByteMessage"; + case HornetQMapMessage.TYPE: // 5 + return "MapMessage"; + case HornetQStreamMessage.TYPE: // 6 + return "StreamMessage"; + default: + return "(Unknown Type)"; + } + } + + @Override + public Map<String, Object> decodeMessageProperties(ServerMessage msg) + { + try + { + return HornetQMessage.coreMaptoJMSMap(msg.toMap()); + } + catch (Throwable t) + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd b/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd new file mode 100644 index 0000000..9add1f9 --- /dev/null +++ b/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd @@ -0,0 +1,267 @@ +<?xml version='1.0' encoding='UTF-8'?> + +<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:hornetq" + xmlns="urn:hornetq" + xmlns:hq="urn:org.hornetq" + elementFormDefault="qualified" + attributeFormDefault="unqualified" + version="1.0"> + + <xsd:element name="configuration" hq:schema="hornetq-jms-configuration"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="jmx-domain" type="xsd:string" default="org.hornetq" + minOccurs="0" maxOccurs="1"/> + <xsd:element ref="connection-factory" maxOccurs="unbounded" minOccurs="0"/> + <xsd:choice maxOccurs="unbounded" minOccurs="0"> + <xsd:element ref="queue" maxOccurs="1" minOccurs="1"/> + <xsd:element ref="topic" maxOccurs="1" minOccurs="1"/> + </xsd:choice> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + + <xsd:element name="connection-factory"> + <xsd:annotation hq:linkend="using-jms.server.configuration"> + <xsd:documentation>a list of connection factories to create and add to + JNDI</xsd:documentation> + </xsd:annotation> + <xsd:complexType> + <xsd:all> + <xsd:element name="xa" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:linkend="using-jms.configure.factory.types" + hq:id="configuration.connection-factory.signature.xa"> + <xsd:documentation>Whether this is an XA connection factory</xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="discovery-group-ref" type="discovery-group-refType" maxOccurs="1" minOccurs="0"> + </xsd:element> + + <xsd:element name="connectors" maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:linkend="clusters"> + <xsd:documentation>A sequence of connectors used by the connection factory + </xsd:documentation> + </xsd:annotation> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="connector-ref" maxOccurs="unbounded" minOccurs="1"> + <xsd:annotation> + <xsd:documentation>A connector reference + </xsd:documentation> + </xsd:annotation> + <xsd:complexType> + <xsd:attribute name="connector-name" type="xsd:string" use="required"> + <xsd:annotation> + <xsd:documentation>Name of the connector to connect to the live server + </xsd:documentation> + </xsd:annotation> + </xsd:attribute> + </xsd:complexType> + </xsd:element> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + + <xsd:element name="entries" maxOccurs="1" minOccurs="0"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="entry" type="entryType" maxOccurs="unbounded" minOccurs="1"> + </xsd:element> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + + <xsd:element name="client-failure-check-period" type="xsd:long" default="30000" + maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.client-failure-check-period" + hq:linkend="dead.connections" hq:default="(ms)"> + <xsd:documentation> + the period (in ms) after which the client will consider the connection failed + after not receiving packets from the server. -1 disables this setting. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="connection-ttl" type="xsd:long" maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.connection-ttl" + hq:linkend="dead.connections"> + <xsd:documentation>the time to live (in ms) for connections + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="call-timeout" type="xsd:long" default="30000" + maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.call-timeout"> + <xsd:documentation> + the timeout (in ms) for remote calls + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="call-failover-timeout" type="xsd:long" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="consumer-window-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="consumer-max-rate" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="confirmation-window-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="producer-window-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="producer-max-rate" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="cache-large-message-client" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="min-large-message-size" type="xsd:long" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="compress-large-messages" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + + <xsd:element name="client-id" type="xsd:string" maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.client-id" + hq:linkend="using-jms.clientid"> + <xsd:documentation> + the pre-configured client ID for the connection factory + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="dups-ok-batch-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="transaction-batch-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="block-on-acknowledge" type="xsd:boolean" default="false" + maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:linkend="send-guarantees.nontrans.acks" + hq:id="configuration.connection-factory.block-on-acknowledge"> + <xsd:documentation> + whether or not messages are acknowledged synchronously + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="block-on-non-durable-send" type="xsd:boolean" default="false" + maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.block-on-non-durable-send" + hq:linkend="non-transactional-sends"> + <xsd:documentation> + whether or not non-durable messages are sent synchronously + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="block-on-durable-send" type="xsd:boolean" default="true" + maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.block-on-durable-send" + hq:linkend="non-transactional-sends"> + <xsd:documentation> + whether or not durable messages are sent synchronously + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="auto-group" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0"> + <xsd:annotation hq:id="configuration.connection-factory.auto-group" + hq:linkend="message-grouping.jmsconfigure"> + <xsd:documentation>whether or not message grouping is automatically used + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="pre-acknowledge" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="retry-interval" type="xsd:long" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="retry-interval-multiplier" type="xsd:float" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="max-retry-interval" type="xsd:long" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="reconnect-attempts" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="failover-on-initial-connection" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="failover-on-server-shutdown" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="connection-load-balancing-policy-class-name" type="xsd:string" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="use-global-pools" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="scheduled-thread-pool-max-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="thread-pool-max-size" type="xsd:int" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="group-id" type="xsd:string" + maxOccurs="1" minOccurs="0"> + </xsd:element> + <xsd:element name="ha" type="xsd:boolean" + maxOccurs="1" minOccurs="0"> + </xsd:element> + </xsd:all> + <xsd:attribute name="name" type="xsd:string"></xsd:attribute> + <xsd:attribute name="signature" type="xsd:string"> + <xsd:annotation hq:id="configuration.connection-factory.signature" + hq:linkend="using-jms.configure.factory.types" + hq:default="generic"> <!-- XXX --> + <xsd:documentation>Type of connection factory</xsd:documentation> + </xsd:annotation> + </xsd:attribute> + </xsd:complexType> + </xsd:element> + + <xsd:complexType name="entryType"> + <xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute> + </xsd:complexType> + + <xsd:complexType name="discovery-group-refType"> + <xsd:attribute name="discovery-group-name" type="xsd:string" use="required"> + <xsd:annotation> + <xsd:documentation> + Name of discovery group used by this connection factory + </xsd:documentation> + </xsd:annotation> + </xsd:attribute> + </xsd:complexType> + + <xsd:element name="queue" type="queueType"></xsd:element> + + <xsd:element name="topic" type="topicType"></xsd:element> + + <xsd:complexType name="queueType"> + <xsd:sequence> + <xsd:element name="entry" type="entryType" maxOccurs="unbounded" minOccurs="1"></xsd:element> + <xsd:element name="selector" maxOccurs="1" minOccurs="0"> + <xsd:complexType> + <xsd:attribute name="string" type="xsd:string" use="required"></xsd:attribute> + </xsd:complexType> + </xsd:element> + <xsd:element name="durable" type="xsd:boolean" maxOccurs="1" minOccurs="0"></xsd:element> + </xsd:sequence> + <xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute> + </xsd:complexType> + + <xsd:complexType name="topicType"> + <xsd:sequence> + <xsd:element name="entry" type="entryType" maxOccurs="unbounded" minOccurs="1"></xsd:element> + </xsd:sequence> + <xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute> + </xsd:complexType> +</xsd:schema> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/pom.xml ---------------------------------------------------------------------- diff --git a/activemq6-journal/pom.xml b/activemq6-journal/pom.xml new file mode 100644 index 0000000..efd1912 --- /dev/null +++ b/activemq6-journal/pom.xml @@ -0,0 +1,92 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-pom</artifactId> + <version>6.0.0-SNAPSHOT</version> + </parent> + + <artifactId>activemq6-journal</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ6 Journal</name> + + <properties> + <hornetq.basedir>${project.basedir}/..</hornetq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + </dependency> + + <!-- + JBoss Logging + --> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </dependency> + <dependency> + <groupId>org.jboss.logmanager</groupId> + <artifactId>jboss-logmanager</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-commons</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-native</artifactId> + <version>${project.version}</version> + </dependency> + <!-- needed to compile the tests --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>release</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9</version> + <configuration> + <doclet>org.jboss.apiviz.APIviz</doclet> + <docletArtifact> + <groupId>org.jboss.apiviz</groupId> + <artifactId>apiviz</artifactId> + <version>1.3.2.GA</version> + </docletArtifact> + <useStandardDocletOptions>true</useStandardDocletOptions> + <minmemory>128m</minmemory> + <maxmemory>512m</maxmemory> + <quiet>false</quiet> + <aggregate>true</aggregate> + <excludePackageNames>org.hornetq.core:org.hornetq.utils</excludePackageNames> + </configuration> + <executions> + <execution> + <id>javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java new file mode 100644 index 0000000..d92fedb --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.asyncio; + +/** + * The interface used for AIO Callbacks. + * @author [email protected] + * + */ +public interface AIOCallback +{ + /** + * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk. + * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i> */ + void done(); + + /** + * Method for error notifications. + * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/ + void onError(int errorCode, String errorMessage); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java new file mode 100644 index 0000000..fad8661 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java @@ -0,0 +1,59 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.asyncio; + +import java.nio.ByteBuffer; + +import org.apache.activemq6.api.core.HornetQException; + +/** + * + * @author [email protected] + * + */ +public interface AsynchronousFile +{ + void close() throws InterruptedException, HornetQException; + + /** + * + * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error + * @param fileName + * @param maxIO The number of max concurrent asynchronous IO operations. It has to be balanced between the size of your writes and the capacity of your disk. + * @throws HornetQException + */ + void open(String fileName, int maxIO) throws HornetQException; + + /** + * Warning: This function will perform a synchronous IO, probably translating to a fstat call + * @throws HornetQException + * */ + long size() throws HornetQException; + + /** Any error will be reported on the callback interface */ + void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback); + + /** + * Performs an internal direct write. + * @throws HornetQException + */ + void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException; + + void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException; + + void fill(long position, int blocks, long size, byte fillChar) throws HornetQException; + + void setBufferCallback(BufferCallback callback); + + int getBlockSize(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java new file mode 100644 index 0000000..189fb40 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.asyncio; + +import java.nio.ByteBuffer; + +/** + * + * Used to receive a notification on completed buffers used by the AIO layer. + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public interface BufferCallback +{ + void bufferDone(ByteBuffer buffer); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java new file mode 100644 index 0000000..892c1d0 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.asyncio; + +/** + * A IOExceptionListener + * + * @author clebert + * + * + */ +public interface IOExceptionListener +{ + void onIOException(Exception exception, String message); +}
