http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java
 
b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java
index 199df55..4f3e899 100644
--- 
a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java
+++ 
b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQStreamMessage.java
@@ -19,11 +19,24 @@ import javax.jms.StreamMessage;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.utils.DataConstants;
 
+import static org.hornetq.reader.StreamMessageUtil.streamReadBoolean;
+import static org.hornetq.reader.StreamMessageUtil.streamReadByte;
+import static org.hornetq.reader.StreamMessageUtil.streamReadBytes;
+import static org.hornetq.reader.StreamMessageUtil.streamReadChar;
+import static org.hornetq.reader.StreamMessageUtil.streamReadDouble;
+import static org.hornetq.reader.StreamMessageUtil.streamReadFloat;
+import static org.hornetq.reader.StreamMessageUtil.streamReadInteger;
+import static org.hornetq.reader.StreamMessageUtil.streamReadLong;
+import static org.hornetq.reader.StreamMessageUtil.streamReadObject;
+import static org.hornetq.reader.StreamMessageUtil.streamReadShort;
+import static org.hornetq.reader.StreamMessageUtil.streamReadString;
+
 /**
  * HornetQ implementation of a JMS StreamMessage.
  *
@@ -92,18 +105,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-
-         switch (type)
-         {
-            case DataConstants.BOOLEAN:
-               return getBuffer().readBoolean();
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Boolean.valueOf(s);
-            default:
-               throw new MessageFormatException("Invalid conversion, type byte 
was " + type);
-         }
+         return streamReadBoolean(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -114,29 +120,18 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
    public byte readByte() throws JMSException
    {
       checkRead();
-      int index = getBuffer().readerIndex();
+
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.BYTE:
-               return getBuffer().readByte();
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Byte.parseByte(s);
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadByte(message);
       }
-      catch (IndexOutOfBoundsException e)
+      catch (IllegalStateException e)
       {
-         throw new MessageEOFException("");
+         throw new MessageFormatException(e.getMessage());
       }
-      catch (NumberFormatException e)
+      catch (IndexOutOfBoundsException e)
       {
-         getBuffer().readerIndex(index);
-         throw e;
+         throw new MessageEOFException("");
       }
    }
 
@@ -145,19 +140,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.BYTE:
-               return getBuffer().readByte();
-            case DataConstants.SHORT:
-               return getBuffer().readShort();
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Short.parseShort(s);
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadShort(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -170,24 +157,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.CHAR:
-               return (char)getBuffer().readShort();
-            case DataConstants.STRING:
-               String str = getBuffer().readNullableString();
-               if (str == null)
-               {
-                  throw new NullPointerException("Invalid conversion");
-               }
-               else
-               {
-                  throw new MessageFormatException("Invalid conversion");
-               }
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadChar(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -200,21 +174,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.BYTE:
-               return getBuffer().readByte();
-            case DataConstants.SHORT:
-               return getBuffer().readShort();
-            case DataConstants.INT:
-               return getBuffer().readInt();
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Integer.parseInt(s);
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadInteger(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -227,23 +191,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.BYTE:
-               return getBuffer().readByte();
-            case DataConstants.SHORT:
-               return getBuffer().readShort();
-            case DataConstants.INT:
-               return getBuffer().readInt();
-            case DataConstants.LONG:
-               return getBuffer().readLong();
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Long.parseLong(s);
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadLong(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -256,17 +208,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.FLOAT:
-               return Float.intBitsToFloat(getBuffer().readInt());
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Float.parseFloat(s);
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadFloat(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -279,19 +225,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.FLOAT:
-               return Float.intBitsToFloat(getBuffer().readInt());
-            case DataConstants.DOUBLE:
-               return Double.longBitsToDouble(getBuffer().readLong());
-            case DataConstants.STRING:
-               String s = getBuffer().readNullableString();
-               return Double.parseDouble(s);
-            default:
-               throw new MessageFormatException("Invalid conversion: " + type);
-         }
+         return streamReadDouble(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -304,30 +242,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.BOOLEAN:
-               return String.valueOf(getBuffer().readBoolean());
-            case DataConstants.BYTE:
-               return String.valueOf(getBuffer().readByte());
-            case DataConstants.SHORT:
-               return String.valueOf(getBuffer().readShort());
-            case DataConstants.CHAR:
-               return String.valueOf((char)getBuffer().readShort());
-            case DataConstants.INT:
-               return String.valueOf(getBuffer().readInt());
-            case DataConstants.LONG:
-               return String.valueOf(getBuffer().readLong());
-            case DataConstants.FLOAT:
-               return 
String.valueOf(Float.intBitsToFloat(getBuffer().readInt()));
-            case DataConstants.DOUBLE:
-               return 
String.valueOf(Double.longBitsToDouble(getBuffer().readLong()));
-            case DataConstants.STRING:
-               return getBuffer().readNullableString();
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadString(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -335,35 +254,24 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       }
    }
 
-   private int len;
+   /**
+    * len here is used to control how many more bytes to read
+    */
+   private int len = 0;
 
    public int readBytes(final byte[] value) throws JMSException
    {
       checkRead();
       try
       {
-         if (len == -1)
-         {
-            len = 0;
-            return -1;
-         }
-         else if (len == 0)
-         {
-            byte type = getBuffer().readByte();
-            if (type != DataConstants.BYTES)
-            {
-               throw new MessageFormatException("Invalid conversion");
-            }
-            len = getBuffer().readInt();
-         }
-         int read = Math.min(value.length, len);
-         getBuffer().readBytes(value, 0, read);
-         len -= read;
-         if (len == 0)
-         {
-            len = -1;
-         }
-         return read;
+         Pair<Integer, Integer> pairRead = streamReadBytes(message, len, 
value);
+
+         len = pairRead.getA();
+         return pairRead.getB();
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -376,35 +284,11 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
       checkRead();
       try
       {
-         byte type = getBuffer().readByte();
-         switch (type)
-         {
-            case DataConstants.BOOLEAN:
-               return getBuffer().readBoolean();
-            case DataConstants.BYTE:
-               return getBuffer().readByte();
-            case DataConstants.SHORT:
-               return getBuffer().readShort();
-            case DataConstants.CHAR:
-               return (char)getBuffer().readShort();
-            case DataConstants.INT:
-               return getBuffer().readInt();
-            case DataConstants.LONG:
-               return getBuffer().readLong();
-            case DataConstants.FLOAT:
-               return Float.intBitsToFloat(getBuffer().readInt());
-            case DataConstants.DOUBLE:
-               return Double.longBitsToDouble(getBuffer().readLong());
-            case DataConstants.STRING:
-               return getBuffer().readNullableString();
-            case DataConstants.BYTES:
-               int bufferLen = getBuffer().readInt();
-               byte[] bytes = new byte[bufferLen];
-               getBuffer().readBytes(bytes);
-               return bytes;
-            default:
-               throw new MessageFormatException("Invalid conversion");
-         }
+         return streamReadObject(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -555,7 +439,7 @@ public final class HornetQStreamMessage extends 
HornetQMessage implements Stream
    // HornetQRAMessage overrides ----------------------------------------
 
    @Override
-   public void clearBody()
+   public void clearBody() throws JMSException
    {
       super.clearBody();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java
 
b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java
index 35b587b..96b7a45 100644
--- 
a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java
+++ 
b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQTextMessage.java
@@ -15,13 +15,16 @@ package org.hornetq.jms.client;
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
 
-import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 
+import static org.hornetq.reader.TextMessageUtil.readBodyText;
+import static org.hornetq.reader.TextMessageUtil.writeBodyText;
+
+
 /**
  * HornetQ implementation of a JMS TextMessage.
  * <br>
@@ -85,10 +88,6 @@ public class HornetQTextMessage extends HornetQMessage 
implements TextMessage
    {
       checkWrite();
 
-      HornetQBuffer buff = message.getBodyBuffer();
-
-      buff.clear();
-
       if (text != null)
       {
          this.text = new SimpleString(text);
@@ -98,7 +97,7 @@ public class HornetQTextMessage extends HornetQMessage 
implements TextMessage
          this.text = null;
       }
 
-      buff.writeNullableSimpleString(this.text);
+      writeBodyText(message, this.text);
    }
 
    public String getText()
@@ -114,7 +113,7 @@ public class HornetQTextMessage extends HornetQMessage 
implements TextMessage
    }
 
    @Override
-   public void clearBody()
+   public void clearBody() throws JMSException
    {
       super.clearBody();
 
@@ -128,7 +127,7 @@ public class HornetQTextMessage extends HornetQMessage 
implements TextMessage
    {
       super.doBeforeReceive();
 
-      text = message.getBodyBuffer().readNullableSimpleString();
+      text = readBodyText(message);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/pom.xml
----------------------------------------------------------------------
diff --git a/hornetq-jms-server/pom.xml b/hornetq-jms-server/pom.xml
index c0323f6..60a7c46 100644
--- a/hornetq-jms-server/pom.xml
+++ b/hornetq-jms-server/pom.xml
@@ -42,7 +42,7 @@
       </dependency>
       <dependency>
          <groupId>org.jboss.spec.javax.transaction</groupId>
-         <artifactId>jboss-transaction-api_1.1_spec</artifactId>
+         <artifactId>jboss-transaction-api_1.2_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.jboss.jbossts.jts</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java
 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java
new file mode 100644
index 0000000..688c7d0
--- /dev/null
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/HornetQJMSBridgeLogger.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.bridge;
+
+import javax.management.ObjectName;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.Cause;
+import org.jboss.logging.annotations.LogMessage;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
+ *
+ * Logger Code 34
+ *
+ * each message id must be 6 digits long starting with 12, the 3rd digit 
donates the level so
+ *
+ * INF0  1
+ * WARN  2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 341000 to 341999
+ */
+@MessageLogger(projectCode = "HQ")
+public interface HornetQJMSBridgeLogger extends BasicLogger
+{
+   /**
+    * The default logger.
+    */
+   HornetQJMSBridgeLogger LOGGER = 
Logger.getMessageLogger(HornetQJMSBridgeLogger.class, 
HornetQJMSBridgeLogger.class.getPackage().getName());
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 341000, value = "Failed to set up JMS bridge connections. 
Most probably the source or target servers are unavailable." +
+         " Will retry after a pause of {0} ms", format = 
Message.Format.MESSAGE_FORMAT)
+   void failedToSetUpBridge(long failureRetryInterval);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 341001, value = "JMS Bridge Succeeded in reconnecting to 
servers" , format = Message.Format.MESSAGE_FORMAT)
+   void bridgeReconnected();
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 341002, value = "Succeeded in connecting to servers" , format 
= Message.Format.MESSAGE_FORMAT)
+   void bridgeConnected();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342000, value = "Attempt to start JMS Bridge, but is already 
started" , format = Message.Format.MESSAGE_FORMAT)
+   void errorBridgeAlreadyStarted();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342001, value = "Failed to start JMS Bridge" , format = 
Message.Format.MESSAGE_FORMAT)
+   void errorStartingBridge();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342002, value = "Failed to unregisted JMS Bridge {0}" , 
format = Message.Format.MESSAGE_FORMAT)
+   void errorUnregisteringBridge(ObjectName objectName);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342003, value = "JMS Bridge unable to set up connections, 
bridge will be stopped" , format = Message.Format.MESSAGE_FORMAT)
+   void errorConnectingBridge();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342004, value = "JMS Bridge Will retry after a pause of {0} 
ms" , format = Message.Format.MESSAGE_FORMAT)
+   void bridgeRetry(long failureRetryInterval);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342005, value = "JMS Bridge unable to set up connections, 
bridge will not be started" , format = Message.Format.MESSAGE_FORMAT)
+   void bridgeNotStarted();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342006, value = "Detected failure on bridge connection" , 
format = Message.Format.MESSAGE_FORMAT)
+   void bridgeFailure(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342009, value = "JMS Bridge failed to send + acknowledge 
batch, closing JMS objects"  , format = Message.Format.MESSAGE_FORMAT)
+   void bridgeAckError(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 342010, value = "Failed to connect JMS Bridge", format = 
Message.Format.MESSAGE_FORMAT)
+   void bridgeConnectError(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 344001, value = "Failed to start source connection" , format 
= Message.Format.MESSAGE_FORMAT)
+   void jmsBridgeSrcConnectError(@Cause Exception e);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
index 09eec8c..ace70ef 100644
--- 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
@@ -52,6 +52,7 @@ import org.hornetq.api.core.client.FailoverEventType;
 import org.hornetq.api.jms.HornetQJMSConstants;
 import org.hornetq.jms.bridge.ConnectionFactoryFactory;
 import org.hornetq.jms.bridge.DestinationFactory;
+import org.hornetq.jms.bridge.HornetQJMSBridgeLogger;
 import org.hornetq.jms.bridge.JMSBridge;
 import org.hornetq.jms.bridge.JMSBridgeControl;
 import org.hornetq.jms.bridge.QualityOfServiceMode;
@@ -59,7 +60,6 @@ import org.hornetq.jms.client.HornetQConnection;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.jms.server.HornetQJMSServerBundle;
-import org.hornetq.jms.server.HornetQJMSServerLogger;
 import org.hornetq.jms.server.recovery.HornetQRegistryBase;
 import org.hornetq.jms.server.recovery.XARecoveryConfig;
 import org.hornetq.utils.ClassloadingUtil;
@@ -71,13 +71,14 @@ import org.hornetq.utils.SensitiveDataCodec;
  * A JMSBridge
  *
  * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
  * @version <tt>$Revision:4566 $</tt>
  */
 public final class JMSBridgeImpl implements JMSBridge
 {
    private static final String[] RESOURCE_RECOVERY_CLASS_NAMES = new 
String[]{"org.jboss.as.messaging.jms.AS7RecoveryRegistry"};
 
-   private static boolean trace = 
HornetQJMSServerLogger.LOGGER.isTraceEnabled();
+   private static boolean trace = 
HornetQJMSBridgeLogger.LOGGER.isTraceEnabled();
 
    private static final int TEN_YEARS = 60 * 60 * 24 * 365 * 10; // in ms
 
@@ -156,6 +157,10 @@ public final class JMSBridgeImpl implements JMSBridge
 
    private boolean failed;
 
+   private boolean connectedSource = false;
+
+   private boolean connectedTarget = false;
+
    private int forwardMode;
 
    private String transactionManagerLocatorClass = 
"org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator";
@@ -344,7 +349,7 @@ public final class JMSBridgeImpl implements JMSBridge
                this.objectName = ObjectName.getInstance(objectName);
                StandardMBean mbean = new StandardMBean(controlBean, 
JMSBridgeControl.class);
                mbeanServer.registerMBean(mbean, this.objectName);
-               HornetQJMSServerLogger.LOGGER.debug("Registered JMSBridge 
instance as: " + this.objectName.getCanonicalName());
+               HornetQJMSBridgeLogger.LOGGER.debug("Registered JMSBridge 
instance as: " + this.objectName.getCanonicalName());
             }
             catch (Exception e)
             {
@@ -359,7 +364,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Created " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Created " + this);
       }
    }
 
@@ -376,13 +381,13 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (started)
       {
-         HornetQJMSServerLogger.LOGGER.errorBridgeAlreadyStarted();
+         HornetQJMSBridgeLogger.LOGGER.errorBridgeAlreadyStarted();
          return;
       }
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Starting " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Starting " + this);
       }
 
       // bridge has been stopped and is restarted
@@ -418,11 +423,13 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (ok)
       {
+         connectedSource = true;
+         connectedTarget = true;
          startSource();
       }
       else
       {
-         HornetQJMSServerLogger.LOGGER.errorStartingBridge();
+         HornetQJMSBridgeLogger.LOGGER.errorStartingBridge();
          handleFailureOnStartup();
       }
 
@@ -440,7 +447,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Starting time checker 
thread");
+            HornetQJMSBridgeLogger.LOGGER.trace("Starting time checker 
thread");
          }
 
          timeChecker = new BatchTimeChecker();
@@ -450,7 +457,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Started time checker thread");
+            HornetQJMSBridgeLogger.LOGGER.trace("Started time checker thread");
          }
       }
 
@@ -458,7 +465,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Started " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Started " + this);
       }
    }
 
@@ -505,9 +512,16 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Stopping " + this);
+            HornetQJMSBridgeLogger.LOGGER.trace("Stopping " + this);
+         }
+         if (!connectedSource && sourceConn != null)
+         {
+            sourceConn.close();
+         }
+         if (!connectedTarget && targetConn != null)
+         {
+            targetConn.close();
          }
-
          synchronized (lock)
          {
             started = false;
@@ -527,7 +541,7 @@ public final class JMSBridgeImpl implements JMSBridge
             // Terminate any transaction
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Rolling back remaining 
tx");
+               HornetQJMSBridgeLogger.LOGGER.trace("Rolling back remaining 
tx");
             }
 
             try
@@ -538,13 +552,13 @@ public final class JMSBridgeImpl implements JMSBridge
             {
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Failed to rollback", 
ignore);
+                  HornetQJMSBridgeLogger.LOGGER.trace("Failed to rollback", 
ignore);
                }
             }
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Rolled back remaining tx");
+               HornetQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx");
             }
          }
 
@@ -556,7 +570,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Failed to close source 
conn", ignore);
+               HornetQJMSBridgeLogger.LOGGER.trace("Failed to close source 
conn", ignore);
             }
          }
 
@@ -570,14 +584,14 @@ public final class JMSBridgeImpl implements JMSBridge
             {
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Failed to close target 
conn", ignore);
+                  HornetQJMSBridgeLogger.LOGGER.trace("Failed to close target 
conn", ignore);
                }
             }
          }
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Stopped " + this);
+            HornetQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
          }
       }
    }
@@ -597,7 +611,7 @@ public final class JMSBridgeImpl implements JMSBridge
          }
          catch (Exception e)
          {
-            HornetQJMSServerLogger.LOGGER.errorUnregisteringBridge(objectName);
+            HornetQJMSBridgeLogger.LOGGER.errorUnregisteringBridge(objectName);
          }
       }
    }
@@ -608,7 +622,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Pausing " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Pausing " + this);
       }
 
       synchronized (lock)
@@ -620,7 +634,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Paused " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Paused " + this);
       }
    }
 
@@ -628,7 +642,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Resuming " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Resuming " + this);
       }
 
       synchronized (lock)
@@ -640,7 +654,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Resumed " + this);
+         HornetQJMSBridgeLogger.LOGGER.trace("Resumed " + this);
       }
    }
 
@@ -955,7 +969,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Enlisting resources in tx");
+         HornetQJMSBridgeLogger.LOGGER.trace("Enlisting resources in tx");
       }
 
       XAResource resSource = ((XASession) sourceSession).getXAResource();
@@ -968,7 +982,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Enlisted resources in tx");
+         HornetQJMSBridgeLogger.LOGGER.trace("Enlisted resources in tx");
       }
    }
 
@@ -976,7 +990,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Delisting resources from tx");
+         HornetQJMSBridgeLogger.LOGGER.trace("Delisting resources from tx");
       }
 
       XAResource resSource = ((XASession) sourceSession).getXAResource();
@@ -989,7 +1003,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Failed to delist source 
resource", e);
+            HornetQJMSBridgeLogger.LOGGER.trace("Failed to delist source 
resource", e);
          }
       }
 
@@ -1003,13 +1017,13 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Failed to delist target 
resource", e);
+            HornetQJMSBridgeLogger.LOGGER.trace("Failed to delist target 
resource", e);
          }
       }
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Delisted resources from tx");
+         HornetQJMSBridgeLogger.LOGGER.trace("Delisted resources from tx");
       }
    }
 
@@ -1017,7 +1031,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Starting JTA transaction");
+         HornetQJMSBridgeLogger.LOGGER.trace("Starting JTA transaction");
       }
 
       TransactionManager tm = getTm();
@@ -1037,7 +1051,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Started JTA transaction");
+         HornetQJMSBridgeLogger.LOGGER.trace("Started JTA transaction");
       }
 
       return tx;
@@ -1073,7 +1087,8 @@ public final class JMSBridgeImpl implements JMSBridge
    private Connection createConnection(final String username, final String 
password,
                                        final ConnectionFactoryFactory cff,
                                        final String clientID,
-                                       final boolean isXA) throws Exception
+                                       final boolean isXA,
+                                       boolean isSource) throws Exception
    {
       Connection conn;
 
@@ -1095,7 +1110,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Creating an XA 
connection");
+               HornetQJMSBridgeLogger.LOGGER.trace("Creating an XA 
connection");
             }
             conn = ((XAConnectionFactory) cf).createXAConnection();
          }
@@ -1103,7 +1118,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Creating a non XA 
connection");
+               HornetQJMSBridgeLogger.LOGGER.trace("Creating a non XA 
connection");
             }
             conn = ((ConnectionFactory) cf).createConnection();
          }
@@ -1114,7 +1129,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Creating an XA 
connection");
+               HornetQJMSBridgeLogger.LOGGER.trace("Creating an XA 
connection");
             }
             conn = ((XAConnectionFactory) cf).createXAConnection(username, 
password);
          }
@@ -1122,7 +1137,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Creating a non XA 
connection");
+               HornetQJMSBridgeLogger.LOGGER.trace("Creating a non XA 
connection");
             }
             conn = ((ConnectionFactory) cf).createConnection(username, 
password);
          }
@@ -1144,12 +1159,12 @@ public final class JMSBridgeImpl implements JMSBridge
          if (ha)
          {
             HornetQConnection hornetQConn = (HornetQConnection) conn;
-            failoverListener = new BridgeFailoverListener();
+            failoverListener = new BridgeFailoverListener(isSource);
             hornetQConn.setFailoverListener(failoverListener);
          }
       }
 
-      conn.setExceptionListener(new BridgeExceptionListener(ha, 
failoverListener));
+      conn.setExceptionListener(new BridgeExceptionListener(ha, 
failoverListener, isSource));
 
       return conn;
    }
@@ -1223,7 +1238,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             // We simply use a single local transacted session for consuming 
and sending
 
-            sourceConn = createConnection(sourceUsername, sourcePassword, 
sourceCff, clientID, false);
+            sourceConn = createConnection(sourceUsername, sourcePassword, 
sourceCff, clientID, false, true);
             sourceSession = sourceConn.createSession(true, 
Session.SESSION_TRANSACTED);
          }
          else // bridging across different servers
@@ -1234,20 +1249,20 @@ public final class JMSBridgeImpl implements JMSBridge
                // Create an XASession for consuming from the source
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Creating XA source 
session");
+                  HornetQJMSBridgeLogger.LOGGER.trace("Creating XA source 
session");
                }
 
-               sourceConn = createConnection(sourceUsername, sourcePassword, 
sourceCff, clientID, true);
+               sourceConn = createConnection(sourceUsername, sourcePassword, 
sourceCff, clientID, true, true);
                sourceSession = ((XAConnection) sourceConn).createXASession();
             }
             else // QoS = DUPLICATES_OK || AT_MOST_ONCE
             {
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Creating non XA source 
session");
+                  HornetQJMSBridgeLogger.LOGGER.trace("Creating non XA source 
session");
                }
 
-               sourceConn = createConnection(sourceUsername, sourcePassword, 
sourceCff, clientID, false);
+               sourceConn = createConnection(sourceUsername, sourcePassword, 
sourceCff, clientID, false, true);
                if (qualityOfServiceMode == QualityOfServiceMode.AT_MOST_ONCE 
&& maxBatchSize == 1)
                {
                   sourceSession = sourceConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -1298,12 +1313,12 @@ public final class JMSBridgeImpl implements JMSBridge
             {
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Creating XA dest 
session");
+                  HornetQJMSBridgeLogger.LOGGER.trace("Creating XA dest 
session");
                }
 
                // Create an XA session for sending to the destination
 
-               targetConn = createConnection(targetUsername, targetPassword, 
targetCff, null, true);
+               targetConn = createConnection(targetUsername, targetPassword, 
targetCff, null, true, false);
 
                targetSession = ((XAConnection) targetConn).createXASession();
             }
@@ -1311,7 +1326,7 @@ public final class JMSBridgeImpl implements JMSBridge
             {
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Creating non XA dest 
session");
+                  HornetQJMSBridgeLogger.LOGGER.trace("Creating non XA dest 
session");
                }
 
                // Create a standard session for sending to the target
@@ -1320,7 +1335,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
                boolean transacted = maxBatchSize > 1;
 
-               targetConn = createConnection(targetUsername, targetPassword, 
targetCff, null, false);
+               targetConn = createConnection(targetUsername, targetPassword, 
targetCff, null, false, false);
 
                targetSession = targetConn.createSession(transacted, transacted 
? Session.SESSION_TRANSACTED
                   : Session.AUTO_ACKNOWLEDGE);
@@ -1331,7 +1346,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Starting JTA transaction");
+               HornetQJMSBridgeLogger.LOGGER.trace("Starting JTA transaction");
             }
 
             tx = startTx();
@@ -1350,7 +1365,7 @@ public final class JMSBridgeImpl implements JMSBridge
          // If this fails we should attempt to cleanup or we might end up in 
some weird state
 
          // Adding a log.warn, so the use may see the cause of the failure and 
take actions
-         HornetQJMSServerLogger.LOGGER.bridgeConnectError(e);
+         HornetQJMSBridgeLogger.LOGGER.bridgeConnectError(e);
 
          cleanup();
 
@@ -1369,7 +1384,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Failed to stop source 
connection", ignore);
+            HornetQJMSBridgeLogger.LOGGER.trace("Failed to stop source 
connection", ignore);
          }
       }
 
@@ -1383,7 +1398,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Failed to delist 
resources", ignore);
+               HornetQJMSBridgeLogger.LOGGER.trace("Failed to delist 
resources", ignore);
             }
          }
          try
@@ -1395,7 +1410,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Failed to rollback", 
ignore);
+               HornetQJMSBridgeLogger.LOGGER.trace("Failed to rollback", 
ignore);
             }
          }
       }
@@ -1409,7 +1424,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Failed to close source 
connection", ignore);
+            HornetQJMSBridgeLogger.LOGGER.trace("Failed to close source 
connection", ignore);
          }
       }
       try
@@ -1423,7 +1438,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Failed to close target 
connection", ignore);
+            HornetQJMSBridgeLogger.LOGGER.trace("Failed to close target 
connection", ignore);
          }
       }
    }
@@ -1447,7 +1462,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Setting up connections");
+         HornetQJMSBridgeLogger.LOGGER.trace("Setting up connections");
       }
 
       int count = 0;
@@ -1468,7 +1483,7 @@ public final class JMSBridgeImpl implements JMSBridge
             break;
          }
 
-         
HornetQJMSServerLogger.LOGGER.failedToSetUpBridge(failureRetryInterval);
+         
HornetQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval);
 
          pause(failureRetryInterval);
       }
@@ -1481,7 +1496,7 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Sending batch of " + 
messages.size() + " messages");
+         HornetQJMSBridgeLogger.LOGGER.trace("Sending batch of " + 
messages.size() + " messages");
       }
 
       if (paused)
@@ -1489,7 +1504,7 @@ public final class JMSBridgeImpl implements JMSBridge
          // Don't send now
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Paused, so not sending now");
+            HornetQJMSBridgeLogger.LOGGER.trace("Paused, so not sending now");
          }
 
          return;
@@ -1520,14 +1535,14 @@ public final class JMSBridgeImpl implements JMSBridge
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Client acking source 
session");
+               HornetQJMSBridgeLogger.LOGGER.trace("Client acking source 
session");
             }
 
             messages.getLast().acknowledge();
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Client acked source 
session");
+               HornetQJMSBridgeLogger.LOGGER.trace("Client acked source 
session");
             }
          }
 
@@ -1542,7 +1557,7 @@ public final class JMSBridgeImpl implements JMSBridge
             }
             catch (TransactionRolledbackException e)
             {
-               HornetQJMSServerLogger.LOGGER.warn(e.getMessage() + ", retrying 
TX", e);
+               HornetQJMSBridgeLogger.LOGGER.warn(e.getMessage() + ", retrying 
TX", e);
                exHappened = true;
             }
          }
@@ -1554,14 +1569,14 @@ public final class JMSBridgeImpl implements JMSBridge
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Committing target 
session");
+               HornetQJMSBridgeLogger.LOGGER.trace("Committing target 
session");
             }
 
             targetSession.commit();
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Committed target session");
+               HornetQJMSBridgeLogger.LOGGER.trace("Committed target session");
             }
          }
 
@@ -1574,30 +1589,36 @@ public final class JMSBridgeImpl implements JMSBridge
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Client acking source 
session");
+               HornetQJMSBridgeLogger.LOGGER.trace("Client acking source 
session");
             }
 
             messages.getLast().acknowledge();
 
             if (JMSBridgeImpl.trace)
             {
-               HornetQJMSServerLogger.LOGGER.trace("Client acked source 
session");
+               HornetQJMSBridgeLogger.LOGGER.trace("Client acked source 
session");
             }
          }
       }
       catch (Exception e)
       {
-         HornetQJMSServerLogger.LOGGER.bridgeAckError(e);
+         if (!stopping)
+         {
+            HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e);
+         }
 
          // We don't call failure otherwise failover would be broken with 
HornetQ
          // We let the ExceptionListener to deal with failures
 
-         try
-         {
-            sourceSession.recover();
-         }
-         catch (Throwable ignored)
+         if (connectedSource)
          {
+            try
+            {
+               sourceSession.recover();
+            }
+            catch (Throwable ignored)
+            {
+            }
          }
 
       }
@@ -1620,14 +1641,14 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Committing JTA transaction");
+            HornetQJMSBridgeLogger.LOGGER.trace("Committing JTA transaction");
          }
 
          tx.commit();
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Committed JTA transaction");
+            HornetQJMSBridgeLogger.LOGGER.trace("Committed JTA transaction");
          }
       }
       catch (Exception e)
@@ -1641,7 +1662,7 @@ public final class JMSBridgeImpl implements JMSBridge
          {
          }
 
-         HornetQJMSServerLogger.LOGGER.bridgeAckError(e);
+         HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e);
 
          //we don't do handle failure here because the tx
          //may be rolledback due to failover. All failure handling
@@ -1662,7 +1683,7 @@ public final class JMSBridgeImpl implements JMSBridge
          }
          catch (Exception e)
          {
-            HornetQJMSServerLogger.LOGGER.bridgeAckError(e);
+            HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e);
 
             handleFailureOnSend();
          }
@@ -1677,20 +1698,20 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Committing source session");
+            HornetQJMSBridgeLogger.LOGGER.trace("Committing source session");
          }
 
          sourceSession.commit();
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Committed source session");
+            HornetQJMSBridgeLogger.LOGGER.trace("Committed source session");
          }
 
       }
       catch (Exception e)
       {
-         HornetQJMSServerLogger.LOGGER.bridgeAckError(e);
+         HornetQJMSBridgeLogger.LOGGER.bridgeAckError(e);
 
          try
          {
@@ -1732,7 +1753,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Sending message " + msg);
+            HornetQJMSBridgeLogger.LOGGER.trace("Sending message " + msg);
          }
 
          // Make sure the correct time to live gets propagated
@@ -1753,7 +1774,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Sent message " + msg);
+            HornetQJMSBridgeLogger.LOGGER.trace("Sent message " + msg);
          }
       }
    }
@@ -1789,7 +1810,7 @@ public final class JMSBridgeImpl implements JMSBridge
       // distributed request/response
       if (JMSBridgeImpl.trace)
       {
-         HornetQJMSServerLogger.LOGGER.trace("Adding old message id in Message 
header");
+         HornetQJMSBridgeLogger.LOGGER.trace("Adding old message id in Message 
header");
       }
 
       JMSBridgeImpl.copyProperties(msg);
@@ -1893,6 +1914,10 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          while (started)
          {
+            if (stopping)
+            {
+               return;
+            }
             synchronized (lock)
             {
                if (paused || failed)
@@ -1903,6 +1928,10 @@ public final class JMSBridgeImpl implements JMSBridge
                   }
                   catch (InterruptedException e)
                   {
+                     if (stopping)
+                     {
+                        return;
+                     }
                      throw new HornetQInterruptedException(e);
                   }
                   continue;
@@ -1924,7 +1953,7 @@ public final class JMSBridgeImpl implements JMSBridge
                {
                   if (JMSBridgeImpl.trace)
                   {
-                     HornetQJMSServerLogger.LOGGER.trace(this + " exception 
while receiving a message", jmse);
+                     HornetQJMSBridgeLogger.LOGGER.trace(this + " exception 
while receiving a message", jmse);
                   }
                }
 
@@ -1938,7 +1967,11 @@ public final class JMSBridgeImpl implements JMSBridge
                   {
                      if (JMSBridgeImpl.trace)
                      {
-                        HornetQJMSServerLogger.LOGGER.trace(this + " thread 
was interrupted");
+                        HornetQJMSBridgeLogger.LOGGER.trace(this + " thread 
was interrupted");
+                     }
+                     if (stopping)
+                     {
+                        return;
                      }
                      throw new HornetQInterruptedException(e);
                   }
@@ -1947,7 +1980,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace(this + " received 
message " + msg);
+                  HornetQJMSBridgeLogger.LOGGER.trace(this + " received 
message " + msg);
                }
 
                messages.add(msg);
@@ -1956,21 +1989,21 @@ public final class JMSBridgeImpl implements JMSBridge
 
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace(this + " rescheduled 
batchExpiryTime to " + batchExpiryTime);
+                  HornetQJMSBridgeLogger.LOGGER.trace(this + " rescheduled 
batchExpiryTime to " + batchExpiryTime);
                }
 
                if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
                {
                   if (JMSBridgeImpl.trace)
                   {
-                     HornetQJMSServerLogger.LOGGER.trace(this + " maxBatchSize 
has been reached so sending batch");
+                     HornetQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize 
has been reached so sending batch");
                   }
 
                   sendBatch();
 
                   if (JMSBridgeImpl.trace)
                   {
-                     HornetQJMSServerLogger.LOGGER.trace(this + " sent batch");
+                     HornetQJMSBridgeLogger.LOGGER.trace(this + " sent batch");
                   }
                }
             }
@@ -1992,14 +2025,15 @@ public final class JMSBridgeImpl implements JMSBridge
          }
          catch (JMSException e)
          {
-            HornetQJMSServerLogger.LOGGER.jmsBridgeSrcConnectError(e);
+            HornetQJMSBridgeLogger.LOGGER.jmsBridgeSrcConnectError(e);
          }
       }
 
       protected void succeeded()
       {
-         HornetQJMSServerLogger.LOGGER.bridgeReconnected();
-
+         HornetQJMSBridgeLogger.LOGGER.bridgeReconnected();
+         connectedSource = true;
+         connectedTarget = true;
          synchronized (lock)
          {
             failed = false;
@@ -2011,7 +2045,7 @@ public final class JMSBridgeImpl implements JMSBridge
       protected void failed()
       {
          // We haven't managed to recreate connections or maxRetries = 0
-         HornetQJMSServerLogger.LOGGER.errorConnectingBridge();
+         HornetQJMSBridgeLogger.LOGGER.errorConnectingBridge();
 
          try
          {
@@ -2026,7 +2060,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace("Failure handler running");
+            HornetQJMSBridgeLogger.LOGGER.trace("Failure handler running");
          }
 
          // Clear the messages
@@ -2038,7 +2072,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (maxRetries > 0 || maxRetries == -1)
          {
-            HornetQJMSServerLogger.LOGGER.bridgeRetry(failureRetryInterval);
+            HornetQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval);
 
             pause(failureRetryInterval);
 
@@ -2063,17 +2097,20 @@ public final class JMSBridgeImpl implements JMSBridge
       protected void failed()
       {
          // Don't call super
-         HornetQJMSServerLogger.LOGGER.bridgeNotStarted();
+         HornetQJMSBridgeLogger.LOGGER.bridgeNotStarted();
       }
 
       @Override
       protected void succeeded()
       {
          // Don't call super - a bit ugly in this case but better than taking 
the lock twice.
-         HornetQJMSServerLogger.LOGGER.bridgeConnected();
+         HornetQJMSBridgeLogger.LOGGER.bridgeConnected();
 
          synchronized (lock)
          {
+
+            connectedSource = true;
+            connectedTarget = true;
             failed = false;
             started = true;
 
@@ -2086,7 +2123,7 @@ public final class JMSBridgeImpl implements JMSBridge
             }
             catch (JMSException e)
             {
-               HornetQJMSServerLogger.LOGGER.jmsBridgeSrcConnectError(e);
+               HornetQJMSBridgeLogger.LOGGER.jmsBridgeSrcConnectError(e);
             }
          }
       }
@@ -2098,7 +2135,7 @@ public final class JMSBridgeImpl implements JMSBridge
       {
          if (JMSBridgeImpl.trace)
          {
-            HornetQJMSServerLogger.LOGGER.trace(this + " running");
+            HornetQJMSBridgeLogger.LOGGER.trace(this + " running");
          }
 
          synchronized (lock)
@@ -2111,7 +2148,7 @@ public final class JMSBridgeImpl implements JMSBridge
                {
                   if (JMSBridgeImpl.trace)
                   {
-                     HornetQJMSServerLogger.LOGGER.trace(this + " waited 
enough");
+                     HornetQJMSBridgeLogger.LOGGER.trace(this + " waited 
enough");
                   }
 
                   synchronized (lock)
@@ -2120,14 +2157,14 @@ public final class JMSBridgeImpl implements JMSBridge
                      {
                         if (JMSBridgeImpl.trace)
                         {
-                           HornetQJMSServerLogger.LOGGER.trace(this + " got 
some messages so sending batch");
+                           HornetQJMSBridgeLogger.LOGGER.trace(this + " got 
some messages so sending batch");
                         }
 
                         sendBatch();
 
                         if (JMSBridgeImpl.trace)
                         {
-                           HornetQJMSServerLogger.LOGGER.trace(this + " sent 
batch");
+                           HornetQJMSBridgeLogger.LOGGER.trace(this + " sent 
batch");
                         }
                      }
                   }
@@ -2140,21 +2177,25 @@ public final class JMSBridgeImpl implements JMSBridge
                   {
                      if (JMSBridgeImpl.trace)
                      {
-                        HornetQJMSServerLogger.LOGGER.trace(this + " waiting 
for " + toWait);
+                        HornetQJMSBridgeLogger.LOGGER.trace(this + " waiting 
for " + toWait);
                      }
 
                      lock.wait(toWait);
 
                      if (JMSBridgeImpl.trace)
                      {
-                        HornetQJMSServerLogger.LOGGER.trace(this + " woke up");
+                        HornetQJMSBridgeLogger.LOGGER.trace(this + " woke up");
                      }
                   }
                   catch (InterruptedException e)
                   {
                      if (JMSBridgeImpl.trace)
                      {
-                        HornetQJMSServerLogger.LOGGER.trace(this + " thread 
was interrupted");
+                        HornetQJMSBridgeLogger.LOGGER.trace(this + " thread 
was interrupted");
+                     }
+                     if (stopping)
+                     {
+                        return;
                      }
                      throw new HornetQInterruptedException(e);
                   }
@@ -2169,25 +2210,43 @@ public final class JMSBridgeImpl implements JMSBridge
    {
       boolean ha;
       BridgeFailoverListener failoverListener;
+      private final boolean isSource;
 
-      public BridgeExceptionListener(boolean ha, BridgeFailoverListener 
failoverListener)
+      public BridgeExceptionListener(boolean ha, BridgeFailoverListener 
failoverListener, boolean isSource)
       {
          this.ha = ha;
          this.failoverListener = failoverListener;
+         this.isSource = isSource;
       }
 
       public void onException(final JMSException e)
       {
-         HornetQJMSServerLogger.LOGGER.bridgeFailure(e);
+         if (stopping)
+         {
+            return;
+         }
+         HornetQJMSBridgeLogger.LOGGER.bridgeFailure(e);
+         if (isSource)
+         {
+            connectedSource = false;
+         }
+         else
+         {
+            connectedTarget = false;
+         }
 
          synchronized (lock)
          {
+            if (stopping)
+            {
+               return;
+            }
             if (failed)
             {
                // The failure has already been detected and is being handled
                if (JMSBridgeImpl.trace)
                {
-                  HornetQJMSServerLogger.LOGGER.trace("Failure recovery 
already in progress");
+                  HornetQJMSBridgeLogger.LOGGER.trace("Failure recovery 
already in progress");
                }
             }
             else
@@ -2221,7 +2280,7 @@ public final class JMSBridgeImpl implements JMSBridge
             }
             catch (Throwable e)
             {
-               HornetQJMSServerLogger.LOGGER.debug("unable to load  recovery 
registry " + locatorClasse, e);
+               HornetQJMSBridgeLogger.LOGGER.debug("unable to load  recovery 
registry " + locatorClasse, e);
             }
             if (registry != null)
             {
@@ -2231,7 +2290,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
          if (registry != null)
          {
-            HornetQJMSServerLogger.LOGGER.debug("Recovery Registry located = " 
+ registry);
+            HornetQJMSBridgeLogger.LOGGER.debug("Recovery Registry located = " 
+ registry);
          }
       }
    }
@@ -2274,14 +2333,31 @@ public final class JMSBridgeImpl implements JMSBridge
 
    private class BridgeFailoverListener implements FailoverEventListener
    {
+      private final boolean isSource;
       volatile FailoverEventType lastEvent;
 
+      public BridgeFailoverListener(boolean isSource)
+      {
+         this.isSource = isSource;
+      }
+
       @Override
       public void failoverEvent(FailoverEventType eventType)
       {
          synchronized (this)
          {
             lastEvent = eventType;
+            if (eventType == FailoverEventType.FAILURE_DETECTED)
+            {
+               if (isSource)
+               {
+                  connectedSource = false;
+               }
+               else
+               {
+                  connectedTarget = false;
+               }
+            }
             this.notify();
          }
       }
@@ -2324,12 +2400,23 @@ public final class JMSBridgeImpl implements JMSBridge
          if (timedOut)
          {
             //timeout, presumably failover failed.
-            HornetQJMSServerLogger.LOGGER.debug("Timed out waiting for 
failover completion " + this);
+            HornetQJMSBridgeLogger.LOGGER.debug("Timed out waiting for 
failover completion " + this);
             return false;
          }
 
+         /*
+         * make sure we reset the connected flags
+         * */
          if (result == FailoverEventType.FAILOVER_COMPLETED)
          {
+            if (isSource)
+            {
+               connectedSource = true;
+            }
+            else
+            {
+               connectedTarget = true;
+            }
             return true;
          }
          //failover failed, need retry.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
index 6d646c9..9c597e2 100644
--- 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
@@ -362,6 +362,11 @@ public class JMSQueueControlImpl extends StandardMBean 
implements JMSQueueContro
       return coreQueueControl.getFilter();
    }
 
+   public void flushExecutor()
+   {
+      coreQueueControl.flushExecutor();
+   }
+
    @Override
    public MBeanInfo getMBeanInfo()
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
index db3e812..f575b1d 100644
--- 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
@@ -21,6 +21,7 @@ import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationEmitter;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -28,7 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.Parameter;
 import org.hornetq.api.jms.JMSFactoryType;
 import org.hornetq.api.jms.management.ConnectionFactoryControl;
@@ -46,7 +46,9 @@ import org.hornetq.jms.server.HornetQJMSServerLogger;
 import org.hornetq.jms.server.JMSServerManager;
 import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
 import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.hornetq.jms.server.management.JMSNotificationType;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.json.JSONArray;
 import org.hornetq.utils.json.JSONObject;
 
@@ -54,7 +56,8 @@ import org.hornetq.utils.json.JSONObject;
  * @author <a href="mailto:[email protected]";>Jeff Mesnil</a>
  * @author <a href="mailto:[email protected]";>Tim Fox</a>
  */
-public class JMSServerControlImpl extends AbstractControl implements 
JMSServerControl, NotificationEmitter
+public class JMSServerControlImpl extends AbstractControl implements 
JMSServerControl, NotificationEmitter,
+                                                                     
org.hornetq.core.server.management.NotificationListener
 {
 
    // Constants -----------------------------------------------------
@@ -129,7 +132,7 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
    public static MBeanNotificationInfo[] getNotificationInfos()
    {
-      NotificationType[] values = NotificationType.values();
+      JMSNotificationType[] values = JMSNotificationType.values();
       String[] names = new String[values.length];
       for (int i = 0; i < values.length; i++)
       {
@@ -147,6 +150,7 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
       super(JMSServerControl.class, 
server.getHornetQServer().getStorageManager());
       this.server = server;
       broadcaster = new NotificationBroadcasterSupport();
+      
server.getHornetQServer().getManagementService().addNotificationListener(this);
    }
 
    // Public --------------------------------------------------------
@@ -196,8 +200,6 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
                                            connectorList,
                                            
JMSServerControlImpl.convert(bindings));
          }
-
-         sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
       }
       finally
       {
@@ -322,7 +324,39 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
       try
       {
-         ConnectionFactoryConfiguration configuration = new 
ConnectionFactoryConfigurationImpl(name, ha, bindings);
+         ConnectionFactoryConfiguration configuration = new 
ConnectionFactoryConfigurationImpl()
+            .setName(name)
+            .setHA(ha)
+            .setBindings(bindings)
+            .setFactoryType(JMSFactoryType.valueOf(cfType))
+            .setClientID(clientID)
+            .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+            .setConnectionTTL(connectionTTL)
+            .setCallTimeout(callTimeout)
+            .setCallFailoverTimeout(callFailoverTimeout)
+            .setMinLargeMessageSize(minLargeMessageSize)
+            .setCompressLargeMessages(compressLargeMessages)
+            .setConsumerWindowSize(consumerWindowSize)
+            .setConsumerMaxRate(consumerMaxRate)
+            .setConfirmationWindowSize(confirmationWindowSize)
+            .setProducerWindowSize(producerWindowSize)
+            .setProducerMaxRate(producerMaxRate)
+            .setBlockOnAcknowledge(blockOnAcknowledge)
+            .setBlockOnDurableSend(blockOnDurableSend)
+            .setBlockOnNonDurableSend(blockOnNonDurableSend)
+            .setAutoGroup(autoGroup)
+            .setPreAcknowledge(preAcknowledge)
+            .setTransactionBatchSize(transactionBatchSize)
+            .setDupsOKBatchSize(dupsOKBatchSize)
+            .setUseGlobalPools(useGlobalPools)
+            .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize)
+            .setThreadPoolMaxSize(threadPoolMaxSize)
+            .setRetryInterval(retryInterval)
+            .setRetryIntervalMultiplier(retryIntervalMultiplier)
+            .setMaxRetryInterval(maxRetryInterval)
+            .setReconnectAttempts(reconnectAttempts)
+            .setFailoverOnInitialConnection(failoverOnInitialConnection)
+            .setGroupID(groupId);
 
          if (useDiscovery)
          {
@@ -338,46 +372,12 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
             configuration.setConnectorNames(connectorNamesList);
          }
 
-         configuration.setFactoryType(JMSFactoryType.valueOf(cfType));
-         configuration.setClientID(clientID);
-         configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
-         configuration.setConnectionTTL(connectionTTL);
-         configuration.setCallTimeout(callTimeout);
-         configuration.setCallFailoverTimeout(callFailoverTimeout);
-         configuration.setMinLargeMessageSize(minLargeMessageSize);
-         configuration.setCompressLargeMessages(compressLargeMessages);
-         configuration.setConsumerWindowSize(consumerWindowSize);
-         configuration.setConsumerMaxRate(consumerMaxRate);
-         configuration.setConfirmationWindowSize(confirmationWindowSize);
-         configuration.setProducerWindowSize(producerWindowSize);
-         configuration.setProducerMaxRate(producerMaxRate);
-         configuration.setBlockOnAcknowledge(blockOnAcknowledge);
-         configuration.setBlockOnDurableSend(blockOnDurableSend);
-         configuration.setBlockOnNonDurableSend(blockOnNonDurableSend);
-         configuration.setAutoGroup(autoGroup);
-         configuration.setPreAcknowledge(preAcknowledge);
-
-         if (loadBalancingPolicyClassName == null || 
loadBalancingPolicyClassName.trim().equals(""))
+         if (loadBalancingPolicyClassName != null && 
!loadBalancingPolicyClassName.trim().equals(""))
          {
-            loadBalancingPolicyClassName = 
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+            
configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
          }
 
-         
configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
-         configuration.setTransactionBatchSize(transactionBatchSize);
-         configuration.setDupsOKBatchSize(dupsOKBatchSize);
-         configuration.setUseGlobalPools(useGlobalPools);
-         
configuration.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
-         configuration.setThreadPoolMaxSize(threadPoolMaxSize);
-         configuration.setRetryInterval(retryInterval);
-         configuration.setRetryIntervalMultiplier(retryIntervalMultiplier);
-         configuration.setMaxRetryInterval(maxRetryInterval);
-         configuration.setReconnectAttempts(reconnectAttempts);
-         
configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
-         configuration.setGroupID(groupId);
-
          server.createConnectionFactory(true, configuration, bindings);
-
-         sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
       }
       finally
       {
@@ -427,12 +427,8 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
       try
       {
-         boolean created = server.createQueue(true, name, selector, durable, 
JMSServerControlImpl.toArray(jndiBindings));
-         if (created)
-         {
-            sendNotification(NotificationType.QUEUE_CREATED, name);
-         }
-         return created;
+         return server.createQueue(true, name, selector, durable,
+               JMSServerControlImpl.toArray(jndiBindings));
       }
       finally
       {
@@ -453,12 +449,7 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
       try
       {
-         boolean destroyed = server.destroyQueue(name, removeConsumers);
-         if (destroyed)
-         {
-            sendNotification(NotificationType.QUEUE_DESTROYED, name);
-         }
-         return destroyed;
+         return server.destroyQueue(name, removeConsumers);
       }
       finally
       {
@@ -479,12 +470,7 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
       try
       {
-         boolean created = server.createTopic(true, topicName, 
JMSServerControlImpl.toArray(jndiBindings));
-         if (created)
-         {
-            sendNotification(NotificationType.TOPIC_CREATED, topicName);
-         }
-         return created;
+         return server.createTopic(true, topicName, 
JMSServerControlImpl.toArray(jndiBindings));
       }
       finally
       {
@@ -506,12 +492,7 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
       try
       {
-         boolean destroyed = server.destroyTopic(name, removeConsumers);
-         if (destroyed)
-         {
-            sendNotification(NotificationType.TOPIC_DESTROYED, name);
-         }
-         return destroyed;
+         return server.destroyTopic(name, removeConsumers);
       }
       finally
       {
@@ -527,11 +508,7 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
       try
       {
-         boolean destroyed = server.destroyConnectionFactory(name);
-         if (destroyed)
-         {
-            sendNotification(NotificationType.CONNECTION_FACTORY_DESTROYED, 
name);
-         }
+         server.destroyConnectionFactory(name);
       }
       finally
       {
@@ -696,6 +673,38 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
       }
    }
 
+   public boolean closeConsumerConnectionsForAddress(final String address) 
throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.closeConsumerConnectionsForAddress(address);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean closeConnectionsForUser(final String userName) throws 
Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.closeConnectionsForUser(userName);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
    public String[] listConnectionIDs() throws Exception
    {
       checkStarted();
@@ -889,12 +898,6 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
    // Private -------------------------------------------------------
 
-   private void sendNotification(final NotificationType type, final String 
message)
-   {
-      Notification notif = new Notification(type.toString(), this, 
notifSeq.incrementAndGet(), message);
-      broadcaster.sendNotification(notif);
-   }
-
    private void checkStarted()
    {
       if (!server.isStarted())
@@ -905,16 +908,6 @@ public class JMSServerControlImpl extends AbstractControl 
implements JMSServerCo
 
    // Inner classes -------------------------------------------------
 
-   public static enum NotificationType
-   {
-      QUEUE_CREATED,
-      QUEUE_DESTROYED,
-      TOPIC_CREATED,
-      TOPIC_DESTROYED,
-      CONNECTION_FACTORY_CREATED,
-      CONNECTION_FACTORY_DESTROYED;
-   }
-
    public String[] listTargetDestinations(String sessionID) throws Exception
    {
       String[] addresses = 
server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID);
@@ -1041,4 +1034,16 @@ public class JMSServerControlImpl extends 
AbstractControl implements JMSServerCo
 
       return obj;
    }
+
+   @Override
+   public void onNotification(org.hornetq.core.server.management.Notification 
notification)
+   {
+      if (!(notification.getType() instanceof JMSNotificationType)) return;
+      JMSNotificationType type = (JMSNotificationType) notification.getType();
+      TypedProperties prop = notification.getProperties();
+
+      this.broadcaster.sendNotification(new Notification(type.toString(), this,
+            notifSeq.incrementAndGet(), 
prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString()));
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
index bdeceeb..f38ba8a 100644
--- 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
@@ -315,13 +315,21 @@ public class JMSTopicControlImpl extends StandardMBean 
implements TopicControl
             String clientID = null;
             String subName = null;
 
-            if (queue.isDurable())
+            if (queue.isDurable() && 
!queue.getName().startsWith(ResourceNames.JMS_TOPIC))
             {
                Pair<String, String> pair = 
HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
                                                                                
                             .toString());
                clientID = pair.getA();
                subName = pair.getB();
             }
+            else if (queue.getName().startsWith(ResourceNames.JMS_TOPIC))
+            {
+               // in the case of heirarchical topics the queue name will not 
follow the <part>.<part> pattern of normal
+               // durable subscribers so skip decomposing the name for the 
client ID and subscription name and just
+               // hard-code it
+               clientID = "HornetQ";
+               subName = "HornetQ";
+            }
 
             String filter = queue.getFilter() != null ? queue.getFilter() : 
null;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java
 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java
index e5c79f3..49536e2 100644
--- 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/HornetQJMSServerLogger.java
@@ -12,8 +12,6 @@
  */
 package org.hornetq.jms.server;
 
-import javax.management.ObjectName;
-
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.jms.server.recovery.XARecoveryConfig;
 import org.jboss.logging.BasicLogger;
@@ -26,7 +24,7 @@ import org.w3c.dom.Node;
 
 /**
  * @author <a href="mailto:[email protected]";>Andy Taylor</a>
- *         3/15/12
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
  *
  * Logger Code 12
  *
@@ -50,19 +48,6 @@ public interface HornetQJMSServerLogger extends BasicLogger
    HornetQJMSServerLogger LOGGER = 
Logger.getMessageLogger(HornetQJMSServerLogger.class, 
HornetQJMSServerLogger.class.getPackage().getName());
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 121000, value = "Failed to set up JMS bridge connections. 
Most probably the source or target servers are unavailable." +
-         " Will retry after a pause of {0} ms", format = 
Message.Format.MESSAGE_FORMAT)
-   void failedToSetUpBridge(long failureRetryInterval);
-
-   @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 121001, value = "JMS Bridge Succeeded in reconnecting to 
servers" , format = Message.Format.MESSAGE_FORMAT)
-   void bridgeReconnected();
-
-   @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 121002, value = "Succeeded in connecting to servers" , format 
= Message.Format.MESSAGE_FORMAT)
-   void bridgeConnected();
-
-   @LogMessage(level = Logger.Level.INFO)
    @Message(id = 121003, value = "JMS Server Manager Running cached command 
for {0}" , format = Message.Format.MESSAGE_FORMAT)
    void serverRunningCachedCommand(Runnable run);
 
@@ -77,34 +62,6 @@ public interface HornetQJMSServerLogger extends BasicLogger
    void invalidHostForConnector(String name, String newHost);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122000, value = "Attempt to start JMS Bridge, but is already 
started" , format = Message.Format.MESSAGE_FORMAT)
-   void errorBridgeAlreadyStarted();
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122001, value = "Failed to start JMS Bridge" , format = 
Message.Format.MESSAGE_FORMAT)
-   void errorStartingBridge();
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122002, value = "Failed to unregisted JMS Bridge {0}" , 
format = Message.Format.MESSAGE_FORMAT)
-   void errorUnregisteringBridge(ObjectName objectName);
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122003, value = "JMS Bridge unable to set up connections, 
bridge will be stopped" , format = Message.Format.MESSAGE_FORMAT)
-   void errorConnectingBridge();
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122004, value = "JMS Bridge Will retry after a pause of {0} 
ms" , format = Message.Format.MESSAGE_FORMAT)
-   void bridgeRetry(long failureRetryInterval);
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122005, value = "JMS Bridge unable to set up connections, 
bridge will not be started" , format = Message.Format.MESSAGE_FORMAT)
-   void bridgeNotStarted();
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122006, value = "Detected failure on bridge connection" , 
format = Message.Format.MESSAGE_FORMAT)
-   void bridgeFailure(@Cause Exception e);
-
-   @LogMessage(level = Logger.Level.WARN)
    @Message(id = 122007, value = "Queue {0} does not exist on the topic {1}. 
It was deleted manually probably." , format = Message.Format.MESSAGE_FORMAT)
    void noQueueOnTopic(String queueName, String name);
 
@@ -113,14 +70,6 @@ public interface HornetQJMSServerLogger extends BasicLogger
    void recoveryConnectFailed(String s);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122009, value = "JMS Bridge failed to send + acknowledge 
batch, closing JMS objects"  , format = Message.Format.MESSAGE_FORMAT)
-   void bridgeAckError(@Cause Exception e);
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122010, value = "Failed to connect JMS Bridge", format = 
Message.Format.MESSAGE_FORMAT)
-   void bridgeConnectError(@Cause Exception e);
-
-   @LogMessage(level = Logger.Level.WARN)
    @Message(id = 122011, value = "error unbinding {0} from JNDI" , format = 
Message.Format.MESSAGE_FORMAT)
    void jndiUnbindError(@Cause Exception e, String key);
 
@@ -161,10 +110,6 @@ public interface HornetQJMSServerLogger extends BasicLogger
    void jmsConfigMissingKey(Node e);
 
    @LogMessage(level = Logger.Level.ERROR)
-   @Message(id = 124001, value = "Failed to start source connection" , format 
= Message.Format.MESSAGE_FORMAT)
-   void jmsBridgeSrcConnectError(@Cause Exception e);
-
-   @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 124002, value = "Failed to start JMS deployer" , format = 
Message.Format.MESSAGE_FORMAT)
    void jmsDeployerStartError(@Cause Exception e);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java
index 35ef0a9..f593077 100644
--- 
a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java
+++ 
b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/JMSServerManager.java
@@ -284,6 +284,10 @@ public interface JMSServerManager extends HornetQComponent
 
    boolean closeConnectionsForAddress(String ipAddress) throws Exception;
 
+   boolean closeConsumerConnectionsForAddress(String address) throws Exception;
+
+   boolean closeConnectionsForUser(String address) throws Exception;
+
    String[] listConnectionIDs() throws Exception;
 
    String[] listSessions(String connectionID) throws Exception;

Reply via email to