http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 37564b5..f5756f2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -41,6 +41,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@@ -76,11 +77,13 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = message.isRequiresResponse();
                   sendContinuations(message.getPacketSize(), 
message.getMessageBodySize(), message.getBody(), message.isContinues());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = request.isRequiresResponse();
                   session.createAddress(request.getAddress(), 
request.getRoutingTypes(), request.isAutoCreated());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = request.isRequiresResponse();
                   session.createQueue(request.getAddress(), 
request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), 
request.isTemporary(), request.isDurable());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   session.createQueue(request.getAddress(), 
request.getQueueName(), request.getRoutingType(), request.getFilterString(), 
request.isTemporary(), request.isDurable(), request.getMaxConsumers(), 
request.isPurgeOnNoConsumers(),
                                       request.isExclusive(), 
request.isLastValue(), request.isAutoCreated());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                      session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.isDurable(), request.getFilterString());
                   }
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                      session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.getRoutingType(), request.getFilterString(), 
request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), 
request.isExclusive(), request.isLastValue());
                   }
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = true;
                   SessionDeleteQueueMessage request = 
(SessionDeleteQueueMessage) packet;
                   session.deleteQueue(request.getQueueName());
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_QUEUEQUERY: {
@@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                case SESS_COMMIT: {
                   requiresResponse = true;
                   session.commit();
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_ROLLBACK: {
                   requiresResponse = true;
                   session.rollback(((RollbackMessage) 
packet).isConsiderLastMessageAsDelivered());
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_XA_COMMIT: {
                   requiresResponse = true;
                   SessionXACommitMessage message = (SessionXACommitMessage) 
packet;
                   session.xaCommit(message.getXid(), message.isOnePhase());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_END: {
                   requiresResponse = true;
                   SessionXAEndMessage message = (SessionXAEndMessage) packet;
                   session.xaEnd(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_FORGET: {
                   requiresResponse = true;
                   SessionXAForgetMessage message = (SessionXAForgetMessage) 
packet;
                   session.xaForget(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_JOIN: {
                   requiresResponse = true;
                   SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
                   session.xaJoin(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_RESUME: {
                   requiresResponse = true;
                   SessionXAResumeMessage message = (SessionXAResumeMessage) 
packet;
                   session.xaResume(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_ROLLBACK: {
                   requiresResponse = true;
                   SessionXARollbackMessage message = 
(SessionXARollbackMessage) packet;
                   session.xaRollback(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_START: {
                   requiresResponse = true;
                   SessionXAStartMessage message = (SessionXAStartMessage) 
packet;
                   session.xaStart(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_FAILED: {
@@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                case SESS_XA_SUSPEND: {
                   requiresResponse = true;
                   session.xaSuspend();
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_PREPARE: {
                   requiresResponse = true;
                   SessionXAPrepareMessage message = (SessionXAPrepareMessage) 
packet;
                   session.xaPrepare(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_INDOUBT_XIDS: {
@@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                case SESS_STOP: {
                   requiresResponse = true;
                   session.stop();
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_CLOSE: {
                   requiresResponse = true;
                   session.close(false);
                   // removeConnectionListeners();
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   flush = true;
                   closeChannel = true;
                   break;
@@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = message.isRequiresResponse();
                   session.individualAcknowledge(message.getConsumerID(), 
message.getMessageID());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = true;
                   SessionConsumerCloseMessage message = 
(SessionConsumerCloseMessage) packet;
                   session.closeConsumer(message.getConsumerID());
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_FORCE_CONSUMER_DELIVERY: {
@@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   break;
                }
                case PacketImpl.SESS_ADD_METADATA: {
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   SessionAddMetaDataMessage message = 
(SessionAddMetaDataMessage) packet;
                   session.addMetaData(message.getKey(), message.getData());
                   break;
@@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = true;
                   SessionAddMetaDataMessageV2 message = 
(SessionAddMetaDataMessageV2) packet;
                   if (message.isRequiresConfirmations()) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   session.addMetaData(message.getKey(), message.getData());
                   break;
@@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                   requiresResponse = true;
                   SessionUniqueAddMetaDataMessage message = 
(SessionUniqueAddMetaDataMessage) packet;
                   if (session.addUniqueMetaData(message.getKey(), 
message.getData())) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   } else {
                      response = new 
ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(),
 message.getData()));
                   }
@@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                }
             }
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, 
requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, 
requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, 
response);
+            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, 
response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, 
requiresResponse, response, this.session);
          }
          sendResponse(packet, response, flush, closeChannel);
       } finally {
@@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       }
    }
 
+   private Packet createNullResponseMessage(Packet packet) {
+      final Packet response;
+      if (!packet.isResponseAsync() || 
channel.getConnection().isVersionBeforeAsyncResponseChange()) {
+         response = new NullResponseMessage();
+      } else {
+         response = new NullResponseMessage_V2(packet.getCorrelationID());
+      }
+      return response;
+   }
+
+   private Packet createSessionXAResponseMessage(Packet packet) {
+      Packet response;
+      if (packet.isResponseAsync()) {
+         response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), 
false, XAResource.XA_OK, null);
+      } else {
+         response = new SessionXAResponseMessage(false, XAResource.XA_OK, 
null);
+      }
+      return response;
+   }
+
    private void onSessionAcknowledge(Packet packet) {
       this.storageManager.setContext(session.getSessionContext());
       try {
@@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             requiresResponse = message.isRequiresResponse();
             this.session.acknowledge(message.getConsumerID(), 
message.getMessageID());
             if (requiresResponse) {
-               response = new NullResponseMessage();
+               response = createNullResponseMessage(packet);
             }
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, 
requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, 
requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, 
response);
+            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, 
response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, 
requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             requiresResponse = message.isRequiresResponse();
             
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), 
this.direct);
             if (requiresResponse) {
-               response = new NullResponseMessage();
+               response = createNullResponseMessage(packet);
             }
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, 
requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, 
requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, 
response);
+            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, 
response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, 
requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             SessionRequestProducerCreditsMessage message = 
(SessionRequestProducerCreditsMessage) packet;
             session.requestProducerCredits(message.getAddress(), 
message.getCredits());
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, 
requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, 
requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, 
response);
+            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, 
response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, 
requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             SessionConsumerFlowCreditMessage message = 
(SessionConsumerFlowCreditMessage) packet;
             session.receiveConsumerCredits(message.getConsumerID(), 
message.getCredits());
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, 
requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, 
requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, 
response);
+            response = 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, 
requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, 
requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, 
response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, 
requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
    }
 
 
-   private static Packet 
onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e,
+   private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet 
packet,
+                                                                     
ActiveMQIOErrorException e,
                                                                      boolean 
requiresResponse,
                                                                      Packet 
response,
                                                                      
ServerSession session) {
       session.markTXFailed(e);
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new ActiveMQExceptionMessage(e);
+         response = convertToExceptionPacket(packet, e);
       } else {
          ActiveMQServerLogger.LOGGER.caughtException(e);
       }
       return response;
    }
 
-   private static Packet 
onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e,
+   private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet,
+                                                                
ActiveMQXAException e,
                                                                 boolean 
requiresResponse,
                                                                 Packet 
response) {
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new SessionXAResponseMessage(true, e.errorCode, 
e.getMessage());
+         if (packet.isResponseAsync()) {
+            response = new 
SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, 
e.getMessage());
+         } else {
+            response = new SessionXAResponseMessage(true, e.errorCode, 
e.getMessage());
+         }
       } else {
          ActiveMQServerLogger.LOGGER.caughtXaException(e);
       }
       return response;
    }
 
-   private static Packet 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached
 e,
+   private static Packet 
onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
+                                                                               
  ActiveMQQueueMaxConsumerLimitReached e,
                                                                                
  boolean requiresResponse,
                                                                                
  Packet response) {
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new ActiveMQExceptionMessage(e);
+         response = convertToExceptionPacket(packet, e);
       } else {
          ActiveMQServerLogger.LOGGER.caughtException(e);
       }
       return response;
    }
 
-   private static Packet 
onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
+   private static Packet convertToExceptionPacket(Packet packet, 
ActiveMQException e) {
+      Packet response;
+      if (packet.isResponseAsync()) {
+         response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), 
e);
+      } else {
+         response = new ActiveMQExceptionMessage(e);
+      }
+      return response;
+   }
+
+   private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
+                                                              
ActiveMQException e,
                                                               boolean 
requiresResponse,
                                                               Packet response) 
{
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new ActiveMQExceptionMessage(e);
+         response = convertToExceptionPacket(packet, e);
       } else {
          if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
             logger.debug("Caught exception", e);
@@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       return response;
    }
 
-   private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
+   private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
+                                                           Throwable t,
                                                            boolean 
requiresResponse,
                                                            Packet response,
                                                            ServerSession 
session) {
@@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
          ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
          ActiveMQException activeMQInternalErrorException = new 
ActiveMQInternalErrorException();
          activeMQInternalErrorException.initCause(t);
-         response = new 
ActiveMQExceptionMessage(activeMQInternalErrorException);
+         response = convertToExceptionPacket(packet, 
activeMQInternalErrorException);
       } else {
          ActiveMQServerLogger.LOGGER.caughtException(t);
       }
@@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
          public void onError(final int errorCode, final String errorMessage) {
             ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, 
errorMessage);
 
-            ActiveMQExceptionMessage exceptionMessage = new 
ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, 
errorMessage));
-
-            doConfirmAndResponse(confirmPacket, exceptionMessage, flush, 
closeChannel);
+            Packet exceptionPacket = convertToExceptionPacket(confirmPacket, 
ActiveMQExceptionType.createException(errorCode, errorMessage));
+            doConfirmAndResponse(confirmPacket, exceptionPacket, flush, 
closeChannel);
 
             if (logger.isTraceEnabled()) {
-               logger.trace("ServerSessionPacketHandler::exception response 
sent::" + exceptionMessage);
+               logger.trace("ServerSessionPacketHandler::exception response 
sent::" + exceptionPacket);
             }
 
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 36e5c33..7acfe0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
       <activemq.version.majorVersion>1</activemq.version.majorVersion>
       <activemq.version.minorVersion>0</activemq.version.minorVersion>
       <activemq.version.microVersion>0</activemq.version.microVersion>
-      
<activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
+      
<activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
       
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
       
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index e4afb5b..c7ed869 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -25,6 +25,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import 
org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
@@ -315,6 +316,11 @@ public class BackupSyncDelay implements Interceptor {
       }
 
       @Override
+      public void setResponseHandler(ResponseHandler handler) {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
       public void flushConfirmations() {
          throw new UnsupportedOperationException();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
index d3951f2..3020310 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
@@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends 
JMSTestBase {
 
       @Override
       public void onException(Message message, Exception exception) {
-         // TODO Auto-generated method stub
+         latch.countDown();
+         try {
+            switch (call) {
+               case 0:
+                  context.rollback();
+                  break;
+               case 1:
+                  context.commit();
+                  break;
+               case 2:
+                  context.close();
+                  break;
+               default:
+                  throw new IllegalArgumentException("call code " + call);
+            }
+         } catch (Exception error1) {
+            this.error = error1;
+         }
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
----------------------------------------------------------------------
diff --git 
a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
 
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
index e83d815..0fd469c 100644
--- 
a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
+++ 
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
@@ -16,12 +16,23 @@
  */
 package org.apache.activemq.artemis.jms.tests;
 
+import static org.junit.Assert.fail;
+
+import javax.jms.CompletionListener;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@@ -68,9 +79,9 @@ public class SecurityTest extends JMSTestCase {
    }
 
 
-      /**
-       * Login with no user, no password Should allow login (equivalent to 
guest)
-       */
+   /**
+    * Login with no user, no password Should allow login (equivalent to guest)
+    */
    @Test
    public void testLoginNoUserNoPassword() throws Exception {
       createConnection();
@@ -170,6 +181,71 @@ public class SecurityTest extends JMSTestCase {
       }
    }
 
+   /**
+    * Login with valid user and password
+    * But try send to address not authorised - Persistent
+    * Should not allow and should throw exception
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws 
Exception {
+      ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      Connection connection = connectionFactory.createConnection("guest", 
"guest");
+      Session session = connection.createSession();
+      Destination destination = session.createQueue("guest.cannot.send");
+      MessageProducer messageProducer = session.createProducer(destination);
+      try {
+         messageProducer.send(session.createTextMessage("hello"));
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         //pass
+      }
+      connection.close();
+   }
+
+   /**
+    * Login with valid user and password
+    * But try send to address not authorised - Non Persistent.
+    * Should have same behaviour as Persistent with exception on send.
+    */
+   @Test
+   public void 
testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws 
Exception {
+      ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setConfirmationWindowSize(100);
+      connectionFactory.setBlockOnDurableSend(false);
+      connectionFactory.setBlockOnNonDurableSend(false);
+      Connection connection = connectionFactory.createConnection("guest", 
"guest");
+      Session session = connection.createSession();
+      Destination destination = session.createQueue("guest.cannot.send");
+      MessageProducer messageProducer = session.createProducer(destination);
+      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      try {
+         AtomicReference<Exception> e = new AtomicReference<>();
+         //        messageProducer.send(session.createTextMessage("hello"));
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         messageProducer.send(session.createTextMessage("hello"), new 
CompletionListener() {
+            @Override
+            public void onCompletion(Message message) {
+               countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Message message, Exception exception) {
+               e.set(exception);
+               countDownLatch.countDown();
+            }
+         });
+         countDownLatch.await(10, TimeUnit.SECONDS);
+         if (e.get() != null) {
+            throw e.get();
+         }
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         activeMQSecurityException.printStackTrace();
+      }
+      connection.close();
+   }
+
    /* Now some client id tests */
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/jms-tests/src/test/resources/broker.xml
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/resources/broker.xml 
b/tests/jms-tests/src/test/resources/broker.xml
index 733e8c3..644ce83 100644
--- a/tests/jms-tests/src/test/resources/broker.xml
+++ b/tests/jms-tests/src/test/resources/broker.xml
@@ -54,6 +54,16 @@
             <permission type="browse" roles="guest,def"/>
             <permission type="send" roles="guest,def"/>
          </security-setting>
+
+         <security-setting match="guest.cannot.send">
+             <permission type="createDurableQueue" roles="guest,def"/>
+             <permission type="deleteDurableQueue" roles="guest,def"/>
+             <permission type="createNonDurableQueue" roles="guest,def"/>
+             <permission type="deleteNonDurableQueue" roles="guest,def"/>
+             <permission type="consume" roles="guest,def"/>
+             <permission type="browse" roles="guest,def"/>
+             <permission type="send" roles="def"/>
+         </security-setting>
      </security-settings>
    </core>
 </configuration>
\ No newline at end of file

Reply via email to