http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java index 199df55..4f3e899 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java @@ -19,11 +19,24 @@ import javax.jms.StreamMessage; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.Message; +import org.hornetq.api.core.Pair; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; import org.hornetq.core.client.impl.ClientMessageImpl; import org.hornetq.utils.DataConstants; +import static org.hornetq.reader.StreamMessageUtil.streamReadBoolean; +import static org.hornetq.reader.StreamMessageUtil.streamReadByte; +import static org.hornetq.reader.StreamMessageUtil.streamReadBytes; +import static org.hornetq.reader.StreamMessageUtil.streamReadChar; +import static org.hornetq.reader.StreamMessageUtil.streamReadDouble; +import static org.hornetq.reader.StreamMessageUtil.streamReadFloat; +import static org.hornetq.reader.StreamMessageUtil.streamReadInteger; +import static org.hornetq.reader.StreamMessageUtil.streamReadLong; +import static org.hornetq.reader.StreamMessageUtil.streamReadObject; +import static org.hornetq.reader.StreamMessageUtil.streamReadShort; +import static org.hornetq.reader.StreamMessageUtil.streamReadString; + /** * HornetQ implementation of a JMS StreamMessage. * @@ -92,18 +105,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - - switch (type) - { - case DataConstants.BOOLEAN: - return getBuffer().readBoolean(); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Boolean.valueOf(s); - default: - throw new MessageFormatException("Invalid conversion, type byte was " + type); - } + return streamReadBoolean(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -114,29 +120,18 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream public byte readByte() throws JMSException { checkRead(); - int index = getBuffer().readerIndex(); + try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.BYTE: - return getBuffer().readByte(); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Byte.parseByte(s); - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadByte(message); } - catch (IndexOutOfBoundsException e) + catch (IllegalStateException e) { - throw new MessageEOFException(""); + throw new MessageFormatException(e.getMessage()); } - catch (NumberFormatException e) + catch (IndexOutOfBoundsException e) { - getBuffer().readerIndex(index); - throw e; + throw new MessageEOFException(""); } } @@ -145,19 +140,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.BYTE: - return getBuffer().readByte(); - case DataConstants.SHORT: - return getBuffer().readShort(); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Short.parseShort(s); - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadShort(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -170,24 +157,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.CHAR: - return (char)getBuffer().readShort(); - case DataConstants.STRING: - String str = getBuffer().readNullableString(); - if (str == null) - { - throw new NullPointerException("Invalid conversion"); - } - else - { - throw new MessageFormatException("Invalid conversion"); - } - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadChar(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -200,21 +174,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.BYTE: - return getBuffer().readByte(); - case DataConstants.SHORT: - return getBuffer().readShort(); - case DataConstants.INT: - return getBuffer().readInt(); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Integer.parseInt(s); - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadInteger(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -227,23 +191,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.BYTE: - return getBuffer().readByte(); - case DataConstants.SHORT: - return getBuffer().readShort(); - case DataConstants.INT: - return getBuffer().readInt(); - case DataConstants.LONG: - return getBuffer().readLong(); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Long.parseLong(s); - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadLong(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -256,17 +208,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.FLOAT: - return Float.intBitsToFloat(getBuffer().readInt()); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Float.parseFloat(s); - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadFloat(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -279,19 +225,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.FLOAT: - return Float.intBitsToFloat(getBuffer().readInt()); - case DataConstants.DOUBLE: - return Double.longBitsToDouble(getBuffer().readLong()); - case DataConstants.STRING: - String s = getBuffer().readNullableString(); - return Double.parseDouble(s); - default: - throw new MessageFormatException("Invalid conversion: " + type); - } + return streamReadDouble(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -304,30 +242,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.BOOLEAN: - return String.valueOf(getBuffer().readBoolean()); - case DataConstants.BYTE: - return String.valueOf(getBuffer().readByte()); - case DataConstants.SHORT: - return String.valueOf(getBuffer().readShort()); - case DataConstants.CHAR: - return String.valueOf((char)getBuffer().readShort()); - case DataConstants.INT: - return String.valueOf(getBuffer().readInt()); - case DataConstants.LONG: - return String.valueOf(getBuffer().readLong()); - case DataConstants.FLOAT: - return String.valueOf(Float.intBitsToFloat(getBuffer().readInt())); - case DataConstants.DOUBLE: - return String.valueOf(Double.longBitsToDouble(getBuffer().readLong())); - case DataConstants.STRING: - return getBuffer().readNullableString(); - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadString(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -335,35 +254,24 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream } } - private int len; + /** + * len here is used to control how many more bytes to read + */ + private int len = 0; public int readBytes(final byte[] value) throws JMSException { checkRead(); try { - if (len == -1) - { - len = 0; - return -1; - } - else if (len == 0) - { - byte type = getBuffer().readByte(); - if (type != DataConstants.BYTES) - { - throw new MessageFormatException("Invalid conversion"); - } - len = getBuffer().readInt(); - } - int read = Math.min(value.length, len); - getBuffer().readBytes(value, 0, read); - len -= read; - if (len == 0) - { - len = -1; - } - return read; + Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); + + len = pairRead.getA(); + return pairRead.getB(); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -376,35 +284,11 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream checkRead(); try { - byte type = getBuffer().readByte(); - switch (type) - { - case DataConstants.BOOLEAN: - return getBuffer().readBoolean(); - case DataConstants.BYTE: - return getBuffer().readByte(); - case DataConstants.SHORT: - return getBuffer().readShort(); - case DataConstants.CHAR: - return (char)getBuffer().readShort(); - case DataConstants.INT: - return getBuffer().readInt(); - case DataConstants.LONG: - return getBuffer().readLong(); - case DataConstants.FLOAT: - return Float.intBitsToFloat(getBuffer().readInt()); - case DataConstants.DOUBLE: - return Double.longBitsToDouble(getBuffer().readLong()); - case DataConstants.STRING: - return getBuffer().readNullableString(); - case DataConstants.BYTES: - int bufferLen = getBuffer().readInt(); - byte[] bytes = new byte[bufferLen]; - getBuffer().readBytes(bytes); - return bytes; - default: - throw new MessageFormatException("Invalid conversion"); - } + return streamReadObject(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); } catch (IndexOutOfBoundsException e) { @@ -555,7 +439,7 @@ public final class HornetQStreamMessage extends HornetQMessage implements Stream // HornetQRAMessage overrides ---------------------------------------- @Override - public void clearBody() + public void clearBody() throws JMSException { super.clearBody();
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java index 35b587b..96b7a45 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java @@ -15,13 +15,16 @@ package org.hornetq.jms.client; import javax.jms.JMSException; import javax.jms.TextMessage; -import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Message; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; +import static org.hornetq.reader.TextMessageUtil.readBodyText; +import static org.hornetq.reader.TextMessageUtil.writeBodyText; + + /** * HornetQ implementation of a JMS TextMessage. * <br> @@ -85,10 +88,6 @@ public class HornetQTextMessage extends HornetQMessage implements TextMessage { checkWrite(); - HornetQBuffer buff = message.getBodyBuffer(); - - buff.clear(); - if (text != null) { this.text = new SimpleString(text); @@ -98,7 +97,7 @@ public class HornetQTextMessage extends HornetQMessage implements TextMessage this.text = null; } - buff.writeNullableSimpleString(this.text); + writeBodyText(message, this.text); } public String getText() @@ -114,7 +113,7 @@ public class HornetQTextMessage extends HornetQMessage implements TextMessage } @Override - public void clearBody() + public void clearBody() throws JMSException { super.clearBody(); @@ -128,7 +127,7 @@ public class HornetQTextMessage extends HornetQMessage implements TextMessage { super.doBeforeReceive(); - text = message.getBodyBuffer().readNullableSimpleString(); + text = readBodyText(message); } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/pom.xml b/hornetq-jms-server/pom.xml index c0323f6..60a7c46 100644 --- a/hornetq-jms-server/pom.xml +++ b/hornetq-jms-server/pom.xml @@ -42,7 +42,7 @@ </dependency> <dependency> <groupId>org.jboss.spec.javax.transaction</groupId> - <artifactId>jboss-transaction-api_1.1_spec</artifactId> + <artifactId>jboss-transaction-api_1.2_spec</artifactId> </dependency> <dependency> <groupId>org.jboss.jbossts.jts</groupId> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java new file mode 100644 index 0000000..688c7d0 --- /dev/null +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java @@ -0,0 +1,101 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.jms.bridge; + +import javax.management.ObjectName; + +import org.jboss.logging.BasicLogger; +import org.jboss.logging.Logger; +import org.jboss.logging.annotations.Cause; +import org.jboss.logging.annotations.LogMessage; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageLogger; + +/** + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + * + * Logger Code 34 + * + * each message id must be 6 digits long starting with 12, the 3rd digit donates the level so + * + * INF0 1 + * WARN 2 + * DEBUG 3 + * ERROR 4 + * TRACE 5 + * FATAL 6 + * + * so an INFO message would be 341000 to 341999 + */ +@MessageLogger(projectCode = "HQ") +public interface HornetQJMSBridgeLogger extends BasicLogger +{ + /** + * The default logger. + */ + HornetQJMSBridgeLogger LOGGER = Logger.getMessageLogger(HornetQJMSBridgeLogger.class, HornetQJMSBridgeLogger.class.getPackage().getName()); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 341000, value = "Failed to set up JMS bridge connections. Most probably the source or target servers are unavailable." + + " Will retry after a pause of {0} ms", format = Message.Format.MESSAGE_FORMAT) + void failedToSetUpBridge(long failureRetryInterval); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 341001, value = "JMS Bridge Succeeded in reconnecting to servers" , format = Message.Format.MESSAGE_FORMAT) + void bridgeReconnected(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 341002, value = "Succeeded in connecting to servers" , format = Message.Format.MESSAGE_FORMAT) + void bridgeConnected(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342000, value = "Attempt to start JMS Bridge, but is already started" , format = Message.Format.MESSAGE_FORMAT) + void errorBridgeAlreadyStarted(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342001, value = "Failed to start JMS Bridge" , format = Message.Format.MESSAGE_FORMAT) + void errorStartingBridge(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342002, value = "Failed to unregisted JMS Bridge {0}" , format = Message.Format.MESSAGE_FORMAT) + void errorUnregisteringBridge(ObjectName objectName); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342003, value = "JMS Bridge unable to set up connections, bridge will be stopped" , format = Message.Format.MESSAGE_FORMAT) + void errorConnectingBridge(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342004, value = "JMS Bridge Will retry after a pause of {0} ms" , format = Message.Format.MESSAGE_FORMAT) + void bridgeRetry(long failureRetryInterval); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342005, value = "JMS Bridge unable to set up connections, bridge will not be started" , format = Message.Format.MESSAGE_FORMAT) + void bridgeNotStarted(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342006, value = "Detected failure on bridge connection" , format = Message.Format.MESSAGE_FORMAT) + void bridgeFailure(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342009, value = "JMS Bridge failed to send + acknowledge batch, closing JMS objects" , format = Message.Format.MESSAGE_FORMAT) + void bridgeAckError(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342010, value = "Failed to connect JMS Bridge", format = Message.Format.MESSAGE_FORMAT) + void bridgeConnectError(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT) + void jmsBridgeSrcConnectError(@Cause Exception e); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java index 09eec8c..ace70ef 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java @@ -52,6 +52,7 @@ import org.hornetq.api.core.client.FailoverEventType; import org.hornetq.api.jms.HornetQJMSConstants; import org.hornetq.jms.bridge.ConnectionFactoryFactory; import org.hornetq.jms.bridge.DestinationFactory; +import org.hornetq.jms.bridge.HornetQJMSBridgeLogger; import org.hornetq.jms.bridge.JMSBridge; import org.hornetq.jms.bridge.JMSBridgeControl; import org.hornetq.jms.bridge.QualityOfServiceMode; @@ -59,7 +60,6 @@ import org.hornetq.jms.client.HornetQConnection; import org.hornetq.jms.client.HornetQConnectionFactory; import org.hornetq.jms.client.HornetQMessage; import org.hornetq.jms.server.HornetQJMSServerBundle; -import org.hornetq.jms.server.HornetQJMSServerLogger; import org.hornetq.jms.server.recovery.HornetQRegistryBase; import org.hornetq.jms.server.recovery.XARecoveryConfig; import org.hornetq.utils.ClassloadingUtil; @@ -71,13 +71,14 @@ import org.hornetq.utils.SensitiveDataCodec; * A JMSBridge * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> * @version <tt>$Revision:4566 $</tt> */ public final class JMSBridgeImpl implements JMSBridge { private static final String[] RESOURCE_RECOVERY_CLASS_NAMES = new String[]{"org.jboss.as.messaging.jms.AS7RecoveryRegistry"}; - private static boolean trace = HornetQJMSServerLogger.LOGGER.isTraceEnabled(); + private static boolean trace = HornetQJMSBridgeLogger.LOGGER.isTraceEnabled(); private static final int TEN_YEARS = 60 * 60 * 24 * 365 * 10; // in ms @@ -156,6 +157,10 @@ public final class JMSBridgeImpl implements JMSBridge private boolean failed; + private boolean connectedSource = false; + + private boolean connectedTarget = false; + private int forwardMode; private String transactionManagerLocatorClass = "org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator"; @@ -344,7 +349,7 @@ public final class JMSBridgeImpl implements JMSBridge this.objectName = ObjectName.getInstance(objectName); StandardMBean mbean = new StandardMBean(controlBean, JMSBridgeControl.class); mbeanServer.registerMBean(mbean, this.objectName); - HornetQJMSServerLogger.LOGGER.debug("Registered JMSBridge instance as: " + this.objectName.getCanonicalName()); + HornetQJMSBridgeLogger.LOGGER.debug("Registered JMSBridge instance as: " + this.objectName.getCanonicalName()); } catch (Exception e) { @@ -359,7 +364,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Created " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Created " + this); } } @@ -376,13 +381,13 @@ public final class JMSBridgeImpl implements JMSBridge if (started) { - HornetQJMSServerLogger.LOGGER.errorBridgeAlreadyStarted(); + HornetQJMSBridgeLogger.LOGGER.errorBridgeAlreadyStarted(); return; } if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Starting " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Starting " + this); } // bridge has been stopped and is restarted @@ -418,11 +423,13 @@ public final class JMSBridgeImpl implements JMSBridge if (ok) { + connectedSource = true; + connectedTarget = true; startSource(); } else { - HornetQJMSServerLogger.LOGGER.errorStartingBridge(); + HornetQJMSBridgeLogger.LOGGER.errorStartingBridge(); handleFailureOnStartup(); } @@ -440,7 +447,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Starting time checker thread"); + HornetQJMSBridgeLogger.LOGGER.trace("Starting time checker thread"); } timeChecker = new BatchTimeChecker(); @@ -450,7 +457,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Started time checker thread"); + HornetQJMSBridgeLogger.LOGGER.trace("Started time checker thread"); } } @@ -458,7 +465,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Started " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Started " + this); } } @@ -505,9 +512,16 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Stopping " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Stopping " + this); + } + if (!connectedSource && sourceConn != null) + { + sourceConn.close(); + } + if (!connectedTarget && targetConn != null) + { + targetConn.close(); } - synchronized (lock) { started = false; @@ -527,7 +541,7 @@ public final class JMSBridgeImpl implements JMSBridge // Terminate any transaction if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Rolling back remaining tx"); + HornetQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); } try @@ -538,13 +552,13 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to rollback", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore); } } if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Rolled back remaining tx"); + HornetQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx"); } } @@ -556,7 +570,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to close source conn", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); } } @@ -570,14 +584,14 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to close target conn", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to close target conn", ignore); } } } if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Stopped " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Stopped " + this); } } } @@ -597,7 +611,7 @@ public final class JMSBridgeImpl implements JMSBridge } catch (Exception e) { - HornetQJMSServerLogger.LOGGER.errorUnregisteringBridge(objectName); + HornetQJMSBridgeLogger.LOGGER.errorUnregisteringBridge(objectName); } } } @@ -608,7 +622,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Pausing " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Pausing " + this); } synchronized (lock) @@ -620,7 +634,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Paused " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Paused " + this); } } @@ -628,7 +642,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Resuming " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Resuming " + this); } synchronized (lock) @@ -640,7 +654,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Resumed " + this); + HornetQJMSBridgeLogger.LOGGER.trace("Resumed " + this); } } @@ -955,7 +969,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Enlisting resources in tx"); + HornetQJMSBridgeLogger.LOGGER.trace("Enlisting resources in tx"); } XAResource resSource = ((XASession) sourceSession).getXAResource(); @@ -968,7 +982,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Enlisted resources in tx"); + HornetQJMSBridgeLogger.LOGGER.trace("Enlisted resources in tx"); } } @@ -976,7 +990,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Delisting resources from tx"); + HornetQJMSBridgeLogger.LOGGER.trace("Delisting resources from tx"); } XAResource resSource = ((XASession) sourceSession).getXAResource(); @@ -989,7 +1003,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to delist source resource", e); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to delist source resource", e); } } @@ -1003,13 +1017,13 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to delist target resource", e); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to delist target resource", e); } } if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Delisted resources from tx"); + HornetQJMSBridgeLogger.LOGGER.trace("Delisted resources from tx"); } } @@ -1017,7 +1031,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Starting JTA transaction"); + HornetQJMSBridgeLogger.LOGGER.trace("Starting JTA transaction"); } TransactionManager tm = getTm(); @@ -1037,7 +1051,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Started JTA transaction"); + HornetQJMSBridgeLogger.LOGGER.trace("Started JTA transaction"); } return tx; @@ -1073,7 +1087,8 @@ public final class JMSBridgeImpl implements JMSBridge private Connection createConnection(final String username, final String password, final ConnectionFactoryFactory cff, final String clientID, - final boolean isXA) throws Exception + final boolean isXA, + boolean isSource) throws Exception { Connection conn; @@ -1095,7 +1110,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating an XA connection"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating an XA connection"); } conn = ((XAConnectionFactory) cf).createXAConnection(); } @@ -1103,7 +1118,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating a non XA connection"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection"); } conn = ((ConnectionFactory) cf).createConnection(); } @@ -1114,7 +1129,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating an XA connection"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating an XA connection"); } conn = ((XAConnectionFactory) cf).createXAConnection(username, password); } @@ -1122,7 +1137,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating a non XA connection"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection"); } conn = ((ConnectionFactory) cf).createConnection(username, password); } @@ -1144,12 +1159,12 @@ public final class JMSBridgeImpl implements JMSBridge if (ha) { HornetQConnection hornetQConn = (HornetQConnection) conn; - failoverListener = new BridgeFailoverListener(); + failoverListener = new BridgeFailoverListener(isSource); hornetQConn.setFailoverListener(failoverListener); } } - conn.setExceptionListener(new BridgeExceptionListener(ha, failoverListener)); + conn.setExceptionListener(new BridgeExceptionListener(ha, failoverListener, isSource)); return conn; } @@ -1223,7 +1238,7 @@ public final class JMSBridgeImpl implements JMSBridge { // We simply use a single local transacted session for consuming and sending - sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff, clientID, false); + sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff, clientID, false, true); sourceSession = sourceConn.createSession(true, Session.SESSION_TRANSACTED); } else // bridging across different servers @@ -1234,20 +1249,20 @@ public final class JMSBridgeImpl implements JMSBridge // Create an XASession for consuming from the source if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating XA source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating XA source session"); } - sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff, clientID, true); + sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff, clientID, true, true); sourceSession = ((XAConnection) sourceConn).createXASession(); } else // QoS = DUPLICATES_OK || AT_MOST_ONCE { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating non XA source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating non XA source session"); } - sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff, clientID, false); + sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff, clientID, false, true); if (qualityOfServiceMode == QualityOfServiceMode.AT_MOST_ONCE && maxBatchSize == 1) { sourceSession = sourceConn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1298,12 +1313,12 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating XA dest session"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating XA dest session"); } // Create an XA session for sending to the destination - targetConn = createConnection(targetUsername, targetPassword, targetCff, null, true); + targetConn = createConnection(targetUsername, targetPassword, targetCff, null, true, false); targetSession = ((XAConnection) targetConn).createXASession(); } @@ -1311,7 +1326,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Creating non XA dest session"); + HornetQJMSBridgeLogger.LOGGER.trace("Creating non XA dest session"); } // Create a standard session for sending to the target @@ -1320,7 +1335,7 @@ public final class JMSBridgeImpl implements JMSBridge boolean transacted = maxBatchSize > 1; - targetConn = createConnection(targetUsername, targetPassword, targetCff, null, false); + targetConn = createConnection(targetUsername, targetPassword, targetCff, null, false, false); targetSession = targetConn.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); @@ -1331,7 +1346,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Starting JTA transaction"); + HornetQJMSBridgeLogger.LOGGER.trace("Starting JTA transaction"); } tx = startTx(); @@ -1350,7 +1365,7 @@ public final class JMSBridgeImpl implements JMSBridge // If this fails we should attempt to cleanup or we might end up in some weird state // Adding a log.warn, so the use may see the cause of the failure and take actions - HornetQJMSServerLogger.LOGGER.bridgeConnectError(e); + HornetQJMSBridgeLogger.LOGGER.bridgeConnectError(e); cleanup(); @@ -1369,7 +1384,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to stop source connection", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore); } } @@ -1383,7 +1398,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to delist resources", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to delist resources", ignore); } } try @@ -1395,7 +1410,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to rollback", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore); } } } @@ -1409,7 +1424,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to close source connection", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore); } } try @@ -1423,7 +1438,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failed to close target connection", ignore); + HornetQJMSBridgeLogger.LOGGER.trace("Failed to close target connection", ignore); } } } @@ -1447,7 +1462,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Setting up connections"); + HornetQJMSBridgeLogger.LOGGER.trace("Setting up connections"); } int count = 0; @@ -1468,7 +1483,7 @@ public final class JMSBridgeImpl implements JMSBridge break; } - HornetQJMSServerLogger.LOGGER.failedToSetUpBridge(failureRetryInterval); + HornetQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval); pause(failureRetryInterval); } @@ -1481,7 +1496,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Sending batch of " + messages.size() + " messages"); + HornetQJMSBridgeLogger.LOGGER.trace("Sending batch of " + messages.size() + " messages"); } if (paused) @@ -1489,7 +1504,7 @@ public final class JMSBridgeImpl implements JMSBridge // Don't send now if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Paused, so not sending now"); + HornetQJMSBridgeLogger.LOGGER.trace("Paused, so not sending now"); } return; @@ -1520,14 +1535,14 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Client acking source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Client acking source session"); } messages.getLast().acknowledge(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Client acked source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Client acked source session"); } } @@ -1542,7 +1557,7 @@ public final class JMSBridgeImpl implements JMSBridge } catch (TransactionRolledbackException e) { - HornetQJMSServerLogger.LOGGER.warn(e.getMessage() + ", retrying TX", e); + HornetQJMSBridgeLogger.LOGGER.warn(e.getMessage() + ", retrying TX", e); exHappened = true; } } @@ -1554,14 +1569,14 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Committing target session"); + HornetQJMSBridgeLogger.LOGGER.trace("Committing target session"); } targetSession.commit(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Committed target session"); + HornetQJMSBridgeLogger.LOGGER.trace("Committed target session"); } } @@ -1574,30 +1589,36 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Client acking source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Client acking source session"); } messages.getLast().acknowledge(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Client acked source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Client acked source session"); } } } catch (Exception e) { - HornetQJMSServerLogger.LOGGER.bridgeAckError(e); + if (!stopping) + { + HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e); + } // We don't call failure otherwise failover would be broken with HornetQ // We let the ExceptionListener to deal with failures - try - { - sourceSession.recover(); - } - catch (Throwable ignored) + if (connectedSource) { + try + { + sourceSession.recover(); + } + catch (Throwable ignored) + { + } } } @@ -1620,14 +1641,14 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Committing JTA transaction"); + HornetQJMSBridgeLogger.LOGGER.trace("Committing JTA transaction"); } tx.commit(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Committed JTA transaction"); + HornetQJMSBridgeLogger.LOGGER.trace("Committed JTA transaction"); } } catch (Exception e) @@ -1641,7 +1662,7 @@ public final class JMSBridgeImpl implements JMSBridge { } - HornetQJMSServerLogger.LOGGER.bridgeAckError(e); + HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e); //we don't do handle failure here because the tx //may be rolledback due to failover. All failure handling @@ -1662,7 +1683,7 @@ public final class JMSBridgeImpl implements JMSBridge } catch (Exception e) { - HornetQJMSServerLogger.LOGGER.bridgeAckError(e); + HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e); handleFailureOnSend(); } @@ -1677,20 +1698,20 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Committing source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Committing source session"); } sourceSession.commit(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Committed source session"); + HornetQJMSBridgeLogger.LOGGER.trace("Committed source session"); } } catch (Exception e) { - HornetQJMSServerLogger.LOGGER.bridgeAckError(e); + HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e); try { @@ -1732,7 +1753,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Sending message " + msg); + HornetQJMSBridgeLogger.LOGGER.trace("Sending message " + msg); } // Make sure the correct time to live gets propagated @@ -1753,7 +1774,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Sent message " + msg); + HornetQJMSBridgeLogger.LOGGER.trace("Sent message " + msg); } } } @@ -1789,7 +1810,7 @@ public final class JMSBridgeImpl implements JMSBridge // distributed request/response if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Adding old message id in Message header"); + HornetQJMSBridgeLogger.LOGGER.trace("Adding old message id in Message header"); } JMSBridgeImpl.copyProperties(msg); @@ -1893,6 +1914,10 @@ public final class JMSBridgeImpl implements JMSBridge { while (started) { + if (stopping) + { + return; + } synchronized (lock) { if (paused || failed) @@ -1903,6 +1928,10 @@ public final class JMSBridgeImpl implements JMSBridge } catch (InterruptedException e) { + if (stopping) + { + return; + } throw new HornetQInterruptedException(e); } continue; @@ -1924,7 +1953,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " exception while receiving a message", jmse); + HornetQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse); } } @@ -1938,7 +1967,11 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " thread was interrupted"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + } + if (stopping) + { + return; } throw new HornetQInterruptedException(e); } @@ -1947,7 +1980,7 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " received message " + msg); + HornetQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg); } messages.add(msg); @@ -1956,21 +1989,21 @@ public final class JMSBridgeImpl implements JMSBridge if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); + HornetQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); } if (maxBatchSize != -1 && messages.size() >= maxBatchSize) { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch"); } sendBatch(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " sent batch"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); } } } @@ -1992,14 +2025,15 @@ public final class JMSBridgeImpl implements JMSBridge } catch (JMSException e) { - HornetQJMSServerLogger.LOGGER.jmsBridgeSrcConnectError(e); + HornetQJMSBridgeLogger.LOGGER.jmsBridgeSrcConnectError(e); } } protected void succeeded() { - HornetQJMSServerLogger.LOGGER.bridgeReconnected(); - + HornetQJMSBridgeLogger.LOGGER.bridgeReconnected(); + connectedSource = true; + connectedTarget = true; synchronized (lock) { failed = false; @@ -2011,7 +2045,7 @@ public final class JMSBridgeImpl implements JMSBridge protected void failed() { // We haven't managed to recreate connections or maxRetries = 0 - HornetQJMSServerLogger.LOGGER.errorConnectingBridge(); + HornetQJMSBridgeLogger.LOGGER.errorConnectingBridge(); try { @@ -2026,7 +2060,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failure handler running"); + HornetQJMSBridgeLogger.LOGGER.trace("Failure handler running"); } // Clear the messages @@ -2038,7 +2072,7 @@ public final class JMSBridgeImpl implements JMSBridge if (maxRetries > 0 || maxRetries == -1) { - HornetQJMSServerLogger.LOGGER.bridgeRetry(failureRetryInterval); + HornetQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval); pause(failureRetryInterval); @@ -2063,17 +2097,20 @@ public final class JMSBridgeImpl implements JMSBridge protected void failed() { // Don't call super - HornetQJMSServerLogger.LOGGER.bridgeNotStarted(); + HornetQJMSBridgeLogger.LOGGER.bridgeNotStarted(); } @Override protected void succeeded() { // Don't call super - a bit ugly in this case but better than taking the lock twice. - HornetQJMSServerLogger.LOGGER.bridgeConnected(); + HornetQJMSBridgeLogger.LOGGER.bridgeConnected(); synchronized (lock) { + + connectedSource = true; + connectedTarget = true; failed = false; started = true; @@ -2086,7 +2123,7 @@ public final class JMSBridgeImpl implements JMSBridge } catch (JMSException e) { - HornetQJMSServerLogger.LOGGER.jmsBridgeSrcConnectError(e); + HornetQJMSBridgeLogger.LOGGER.jmsBridgeSrcConnectError(e); } } } @@ -2098,7 +2135,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " running"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " running"); } synchronized (lock) @@ -2111,7 +2148,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " waited enough"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " waited enough"); } synchronized (lock) @@ -2120,14 +2157,14 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " got some messages so sending batch"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch"); } sendBatch(); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " sent batch"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); } } } @@ -2140,21 +2177,25 @@ public final class JMSBridgeImpl implements JMSBridge { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " waiting for " + toWait); + HornetQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait); } lock.wait(toWait); if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " woke up"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " woke up"); } } catch (InterruptedException e) { if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace(this + " thread was interrupted"); + HornetQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + } + if (stopping) + { + return; } throw new HornetQInterruptedException(e); } @@ -2169,25 +2210,43 @@ public final class JMSBridgeImpl implements JMSBridge { boolean ha; BridgeFailoverListener failoverListener; + private final boolean isSource; - public BridgeExceptionListener(boolean ha, BridgeFailoverListener failoverListener) + public BridgeExceptionListener(boolean ha, BridgeFailoverListener failoverListener, boolean isSource) { this.ha = ha; this.failoverListener = failoverListener; + this.isSource = isSource; } public void onException(final JMSException e) { - HornetQJMSServerLogger.LOGGER.bridgeFailure(e); + if (stopping) + { + return; + } + HornetQJMSBridgeLogger.LOGGER.bridgeFailure(e); + if (isSource) + { + connectedSource = false; + } + else + { + connectedTarget = false; + } synchronized (lock) { + if (stopping) + { + return; + } if (failed) { // The failure has already been detected and is being handled if (JMSBridgeImpl.trace) { - HornetQJMSServerLogger.LOGGER.trace("Failure recovery already in progress"); + HornetQJMSBridgeLogger.LOGGER.trace("Failure recovery already in progress"); } } else @@ -2221,7 +2280,7 @@ public final class JMSBridgeImpl implements JMSBridge } catch (Throwable e) { - HornetQJMSServerLogger.LOGGER.debug("unable to load recovery registry " + locatorClasse, e); + HornetQJMSBridgeLogger.LOGGER.debug("unable to load recovery registry " + locatorClasse, e); } if (registry != null) { @@ -2231,7 +2290,7 @@ public final class JMSBridgeImpl implements JMSBridge if (registry != null) { - HornetQJMSServerLogger.LOGGER.debug("Recovery Registry located = " + registry); + HornetQJMSBridgeLogger.LOGGER.debug("Recovery Registry located = " + registry); } } } @@ -2274,14 +2333,31 @@ public final class JMSBridgeImpl implements JMSBridge private class BridgeFailoverListener implements FailoverEventListener { + private final boolean isSource; volatile FailoverEventType lastEvent; + public BridgeFailoverListener(boolean isSource) + { + this.isSource = isSource; + } + @Override public void failoverEvent(FailoverEventType eventType) { synchronized (this) { lastEvent = eventType; + if (eventType == FailoverEventType.FAILURE_DETECTED) + { + if (isSource) + { + connectedSource = false; + } + else + { + connectedTarget = false; + } + } this.notify(); } } @@ -2324,12 +2400,23 @@ public final class JMSBridgeImpl implements JMSBridge if (timedOut) { //timeout, presumably failover failed. - HornetQJMSServerLogger.LOGGER.debug("Timed out waiting for failover completion " + this); + HornetQJMSBridgeLogger.LOGGER.debug("Timed out waiting for failover completion " + this); return false; } + /* + * make sure we reset the connected flags + * */ if (result == FailoverEventType.FAILOVER_COMPLETED) { + if (isSource) + { + connectedSource = true; + } + else + { + connectedTarget = true; + } return true; } //failover failed, need retry. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java index 6d646c9..9c597e2 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java @@ -362,6 +362,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro return coreQueueControl.getFilter(); } + public void flushExecutor() + { + coreQueueControl.flushExecutor(); + } + @Override public MBeanInfo getMBeanInfo() { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java index db3e812..f575b1d 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java @@ -21,6 +21,7 @@ import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationEmitter; import javax.management.NotificationFilter; import javax.management.NotificationListener; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.management.Parameter; import org.hornetq.api.jms.JMSFactoryType; import org.hornetq.api.jms.management.ConnectionFactoryControl; @@ -46,7 +46,9 @@ import org.hornetq.jms.server.HornetQJMSServerLogger; import org.hornetq.jms.server.JMSServerManager; import org.hornetq.jms.server.config.ConnectionFactoryConfiguration; import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl; +import org.hornetq.jms.server.management.JMSNotificationType; import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.utils.TypedProperties; import org.hornetq.utils.json.JSONArray; import org.hornetq.utils.json.JSONObject; @@ -54,7 +56,8 @@ import org.hornetq.utils.json.JSONObject; * @author <a href="mailto:[email protected]">Jeff Mesnil</a> * @author <a href="mailto:[email protected]">Tim Fox</a> */ -public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter +public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter, + org.hornetq.core.server.management.NotificationListener { // Constants ----------------------------------------------------- @@ -129,7 +132,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo public static MBeanNotificationInfo[] getNotificationInfos() { - NotificationType[] values = NotificationType.values(); + JMSNotificationType[] values = JMSNotificationType.values(); String[] names = new String[values.length]; for (int i = 0; i < values.length; i++) { @@ -147,6 +150,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo super(JMSServerControl.class, server.getHornetQServer().getStorageManager()); this.server = server; broadcaster = new NotificationBroadcasterSupport(); + server.getHornetQServer().getManagementService().addNotificationListener(this); } // Public -------------------------------------------------------- @@ -196,8 +200,6 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo connectorList, JMSServerControlImpl.convert(bindings)); } - - sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name); } finally { @@ -322,7 +324,39 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo try { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, bindings); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(name) + .setHA(ha) + .setBindings(bindings) + .setFactoryType(JMSFactoryType.valueOf(cfType)) + .setClientID(clientID) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setMinLargeMessageSize(minLargeMessageSize) + .setCompressLargeMessages(compressLargeMessages) + .setConsumerWindowSize(consumerWindowSize) + .setConsumerMaxRate(consumerMaxRate) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setProducerMaxRate(producerMaxRate) + .setBlockOnAcknowledge(blockOnAcknowledge) + .setBlockOnDurableSend(blockOnDurableSend) + .setBlockOnNonDurableSend(blockOnNonDurableSend) + .setAutoGroup(autoGroup) + .setPreAcknowledge(preAcknowledge) + .setTransactionBatchSize(transactionBatchSize) + .setDupsOKBatchSize(dupsOKBatchSize) + .setUseGlobalPools(useGlobalPools) + .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize) + .setThreadPoolMaxSize(threadPoolMaxSize) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setReconnectAttempts(reconnectAttempts) + .setFailoverOnInitialConnection(failoverOnInitialConnection) + .setGroupID(groupId); if (useDiscovery) { @@ -338,46 +372,12 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo configuration.setConnectorNames(connectorNamesList); } - configuration.setFactoryType(JMSFactoryType.valueOf(cfType)); - configuration.setClientID(clientID); - configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod); - configuration.setConnectionTTL(connectionTTL); - configuration.setCallTimeout(callTimeout); - configuration.setCallFailoverTimeout(callFailoverTimeout); - configuration.setMinLargeMessageSize(minLargeMessageSize); - configuration.setCompressLargeMessages(compressLargeMessages); - configuration.setConsumerWindowSize(consumerWindowSize); - configuration.setConsumerMaxRate(consumerMaxRate); - configuration.setConfirmationWindowSize(confirmationWindowSize); - configuration.setProducerWindowSize(producerWindowSize); - configuration.setProducerMaxRate(producerMaxRate); - configuration.setBlockOnAcknowledge(blockOnAcknowledge); - configuration.setBlockOnDurableSend(blockOnDurableSend); - configuration.setBlockOnNonDurableSend(blockOnNonDurableSend); - configuration.setAutoGroup(autoGroup); - configuration.setPreAcknowledge(preAcknowledge); - - if (loadBalancingPolicyClassName == null || loadBalancingPolicyClassName.trim().equals("")) + if (loadBalancingPolicyClassName != null && !loadBalancingPolicyClassName.trim().equals("")) { - loadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME; + configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); } - configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); - configuration.setTransactionBatchSize(transactionBatchSize); - configuration.setDupsOKBatchSize(dupsOKBatchSize); - configuration.setUseGlobalPools(useGlobalPools); - configuration.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); - configuration.setThreadPoolMaxSize(threadPoolMaxSize); - configuration.setRetryInterval(retryInterval); - configuration.setRetryIntervalMultiplier(retryIntervalMultiplier); - configuration.setMaxRetryInterval(maxRetryInterval); - configuration.setReconnectAttempts(reconnectAttempts); - configuration.setFailoverOnInitialConnection(failoverOnInitialConnection); - configuration.setGroupID(groupId); - server.createConnectionFactory(true, configuration, bindings); - - sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name); } finally { @@ -427,12 +427,8 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo try { - boolean created = server.createQueue(true, name, selector, durable, JMSServerControlImpl.toArray(jndiBindings)); - if (created) - { - sendNotification(NotificationType.QUEUE_CREATED, name); - } - return created; + return server.createQueue(true, name, selector, durable, + JMSServerControlImpl.toArray(jndiBindings)); } finally { @@ -453,12 +449,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo try { - boolean destroyed = server.destroyQueue(name, removeConsumers); - if (destroyed) - { - sendNotification(NotificationType.QUEUE_DESTROYED, name); - } - return destroyed; + return server.destroyQueue(name, removeConsumers); } finally { @@ -479,12 +470,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo try { - boolean created = server.createTopic(true, topicName, JMSServerControlImpl.toArray(jndiBindings)); - if (created) - { - sendNotification(NotificationType.TOPIC_CREATED, topicName); - } - return created; + return server.createTopic(true, topicName, JMSServerControlImpl.toArray(jndiBindings)); } finally { @@ -506,12 +492,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo try { - boolean destroyed = server.destroyTopic(name, removeConsumers); - if (destroyed) - { - sendNotification(NotificationType.TOPIC_DESTROYED, name); - } - return destroyed; + return server.destroyTopic(name, removeConsumers); } finally { @@ -527,11 +508,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo try { - boolean destroyed = server.destroyConnectionFactory(name); - if (destroyed) - { - sendNotification(NotificationType.CONNECTION_FACTORY_DESTROYED, name); - } + server.destroyConnectionFactory(name); } finally { @@ -696,6 +673,38 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo } } + public boolean closeConsumerConnectionsForAddress(final String address) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.closeConsumerConnectionsForAddress(address); + } + finally + { + blockOnIO(); + } + } + + public boolean closeConnectionsForUser(final String userName) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.closeConnectionsForUser(userName); + } + finally + { + blockOnIO(); + } + } + public String[] listConnectionIDs() throws Exception { checkStarted(); @@ -889,12 +898,6 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo // Private ------------------------------------------------------- - private void sendNotification(final NotificationType type, final String message) - { - Notification notif = new Notification(type.toString(), this, notifSeq.incrementAndGet(), message); - broadcaster.sendNotification(notif); - } - private void checkStarted() { if (!server.isStarted()) @@ -905,16 +908,6 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo // Inner classes ------------------------------------------------- - public static enum NotificationType - { - QUEUE_CREATED, - QUEUE_DESTROYED, - TOPIC_CREATED, - TOPIC_DESTROYED, - CONNECTION_FACTORY_CREATED, - CONNECTION_FACTORY_DESTROYED; - } - public String[] listTargetDestinations(String sessionID) throws Exception { String[] addresses = server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID); @@ -1041,4 +1034,16 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo return obj; } + + @Override + public void onNotification(org.hornetq.core.server.management.Notification notification) + { + if (!(notification.getType() instanceof JMSNotificationType)) return; + JMSNotificationType type = (JMSNotificationType) notification.getType(); + TypedProperties prop = notification.getProperties(); + + this.broadcaster.sendNotification(new Notification(type.toString(), this, + notifSeq.incrementAndGet(), prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString())); + } + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java index bdeceeb..f38ba8a 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java @@ -315,13 +315,21 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl String clientID = null; String subName = null; - if (queue.isDurable()) + if (queue.isDurable() && !queue.getName().startsWith(ResourceNames.JMS_TOPIC)) { Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName() .toString()); clientID = pair.getA(); subName = pair.getB(); } + else if (queue.getName().startsWith(ResourceNames.JMS_TOPIC)) + { + // in the case of heirarchical topics the queue name will not follow the <part>.<part> pattern of normal + // durable subscribers so skip decomposing the name for the client ID and subscription name and just + // hard-code it + clientID = "HornetQ"; + subName = "HornetQ"; + } String filter = queue.getFilter() != null ? queue.getFilter() : null; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java index e5c79f3..49536e2 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java @@ -12,8 +12,6 @@ */ package org.hornetq.jms.server; -import javax.management.ObjectName; - import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.jms.server.recovery.XARecoveryConfig; import org.jboss.logging.BasicLogger; @@ -26,7 +24,7 @@ import org.w3c.dom.Node; /** * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 3/15/12 + * @author <a href="mailto:[email protected]">Martyn Taylor</a> * * Logger Code 12 * @@ -50,19 +48,6 @@ public interface HornetQJMSServerLogger extends BasicLogger HornetQJMSServerLogger LOGGER = Logger.getMessageLogger(HornetQJMSServerLogger.class, HornetQJMSServerLogger.class.getPackage().getName()); @LogMessage(level = Logger.Level.INFO) - @Message(id = 121000, value = "Failed to set up JMS bridge connections. Most probably the source or target servers are unavailable." + - " Will retry after a pause of {0} ms", format = Message.Format.MESSAGE_FORMAT) - void failedToSetUpBridge(long failureRetryInterval); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 121001, value = "JMS Bridge Succeeded in reconnecting to servers" , format = Message.Format.MESSAGE_FORMAT) - void bridgeReconnected(); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 121002, value = "Succeeded in connecting to servers" , format = Message.Format.MESSAGE_FORMAT) - void bridgeConnected(); - - @LogMessage(level = Logger.Level.INFO) @Message(id = 121003, value = "JMS Server Manager Running cached command for {0}" , format = Message.Format.MESSAGE_FORMAT) void serverRunningCachedCommand(Runnable run); @@ -77,34 +62,6 @@ public interface HornetQJMSServerLogger extends BasicLogger void invalidHostForConnector(String name, String newHost); @LogMessage(level = Logger.Level.WARN) - @Message(id = 122000, value = "Attempt to start JMS Bridge, but is already started" , format = Message.Format.MESSAGE_FORMAT) - void errorBridgeAlreadyStarted(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122001, value = "Failed to start JMS Bridge" , format = Message.Format.MESSAGE_FORMAT) - void errorStartingBridge(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122002, value = "Failed to unregisted JMS Bridge {0}" , format = Message.Format.MESSAGE_FORMAT) - void errorUnregisteringBridge(ObjectName objectName); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122003, value = "JMS Bridge unable to set up connections, bridge will be stopped" , format = Message.Format.MESSAGE_FORMAT) - void errorConnectingBridge(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122004, value = "JMS Bridge Will retry after a pause of {0} ms" , format = Message.Format.MESSAGE_FORMAT) - void bridgeRetry(long failureRetryInterval); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122005, value = "JMS Bridge unable to set up connections, bridge will not be started" , format = Message.Format.MESSAGE_FORMAT) - void bridgeNotStarted(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122006, value = "Detected failure on bridge connection" , format = Message.Format.MESSAGE_FORMAT) - void bridgeFailure(@Cause Exception e); - - @LogMessage(level = Logger.Level.WARN) @Message(id = 122007, value = "Queue {0} does not exist on the topic {1}. It was deleted manually probably." , format = Message.Format.MESSAGE_FORMAT) void noQueueOnTopic(String queueName, String name); @@ -113,14 +70,6 @@ public interface HornetQJMSServerLogger extends BasicLogger void recoveryConnectFailed(String s); @LogMessage(level = Logger.Level.WARN) - @Message(id = 122009, value = "JMS Bridge failed to send + acknowledge batch, closing JMS objects" , format = Message.Format.MESSAGE_FORMAT) - void bridgeAckError(@Cause Exception e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122010, value = "Failed to connect JMS Bridge", format = Message.Format.MESSAGE_FORMAT) - void bridgeConnectError(@Cause Exception e); - - @LogMessage(level = Logger.Level.WARN) @Message(id = 122011, value = "error unbinding {0} from JNDI" , format = Message.Format.MESSAGE_FORMAT) void jndiUnbindError(@Cause Exception e, String key); @@ -161,10 +110,6 @@ public interface HornetQJMSServerLogger extends BasicLogger void jmsConfigMissingKey(Node e); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT) - void jmsBridgeSrcConnectError(@Cause Exception e); - - @LogMessage(level = Logger.Level.ERROR) @Message(id = 124002, value = "Failed to start JMS deployer" , format = Message.Format.MESSAGE_FORMAT) void jmsDeployerStartError(@Cause Exception e); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java index 35ef0a9..f593077 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java @@ -284,6 +284,10 @@ public interface JMSServerManager extends HornetQComponent boolean closeConnectionsForAddress(String ipAddress) throws Exception; + boolean closeConsumerConnectionsForAddress(String address) throws Exception; + + boolean closeConnectionsForUser(String address) throws Exception; + String[] listConnectionIDs() throws Exception; String[] listSessions(String connectionID) throws Exception;
