Repository: activemq-artemis
Updated Branches:
  refs/heads/master bfa679c17 -> ace43c8ff


ARTEMIS-1051 using ServerSession's own lock


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e2b2e247
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e2b2e247
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e2b2e247

Branch: refs/heads/master
Commit: e2b2e247d9925d4e0ff033ff63b7ef18b4e4a4f2
Parents: 8394fec
Author: Clebert Suconic <[email protected]>
Authored: Mon Mar 20 21:56:07 2017 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Tue Mar 21 09:14:24 2017 -0400

----------------------------------------------------------------------
 .../core/server/impl/ServerSessionImpl.java     | 162 +++++++++----------
 1 file changed, 78 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e2b2e247/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index af1c532..97a4249 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -182,8 +182,6 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    private Set<Closeable> closeables;
 
-   private final Object sendLock = new Object();
-
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -1287,61 +1285,59 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
    }
 
    @Override
-   public RoutingStatus send(Transaction tx,
-                             final Message message,
-                             final boolean direct,
-                             boolean noAutoCreateQueue) throws Exception {
+   public synchronized RoutingStatus send(Transaction tx,
+                                          final Message message,
+                                          final boolean direct,
+                                          boolean noAutoCreateQueue) throws 
Exception {
 
-      synchronized (sendLock) {
-         // If the protocol doesn't support flow control, we have no choice 
other than fail the communication
-         if (!this.getRemotingConnection().isSupportsFlowControl() && 
pagingManager.isDiskFull()) {
-            ActiveMQIOErrorException exception = 
ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
-            this.getRemotingConnection().fail(exception);
-            throw exception;
-         }
+      // If the protocol doesn't support flow control, we have no choice other 
than fail the communication
+      if (!this.getRemotingConnection().isSupportsFlowControl() && 
pagingManager.isDiskFull()) {
+         ActiveMQIOErrorException exception = 
ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
+         this.getRemotingConnection().fail(exception);
+         throw exception;
+      }
 
-         RoutingStatus result = RoutingStatus.OK;
-         //large message may come from StompSession directly, in which
-         //case the id header already generated.
-         if (!message.isLargeMessage()) {
-            long id = storageManager.generateID();
-            // This will re-encode the message
-            message.setMessageID(id);
-         }
+      RoutingStatus result = RoutingStatus.OK;
+      //large message may come from StompSession directly, in which
+      //case the id header already generated.
+      if (!message.isLargeMessage()) {
+         long id = storageManager.generateID();
+         // This will re-encode the message
+         message.setMessageID(id);
+      }
 
-         if (server.getConfiguration().isPopulateValidatedUser() && 
validatedUser != null) {
-            message.putStringProperty(Message.HDR_VALIDATED_USER, 
SimpleString.toSimpleString(validatedUser));
-         }
+      if (server.getConfiguration().isPopulateValidatedUser() && validatedUser 
!= null) {
+         message.putStringProperty(Message.HDR_VALIDATED_USER, 
SimpleString.toSimpleString(validatedUser));
+      }
 
-         SimpleString address = message.getAddressSimpleString();
+      SimpleString address = message.getAddressSimpleString();
 
-         if (defaultAddress == null && address != null) {
-            defaultAddress = address;
-         }
+      if (defaultAddress == null && address != null) {
+         defaultAddress = address;
+      }
 
-         if (address == null) {
-            // We don't want to force a re-encode when the message gets sent 
to the consumer
-            message.setAddress(defaultAddress);
-         }
+      if (address == null) {
+         // We don't want to force a re-encode when the message gets sent to 
the consumer
+         message.setAddress(defaultAddress);
+      }
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("send(message=" + message + ", direct=" + direct + ") 
being called");
-         }
+      if (logger.isTraceEnabled()) {
+         logger.trace("send(message=" + message + ", direct=" + direct + ") 
being called");
+      }
 
-         if (message.getAddress() == null) {
-            // This could happen with some tests that are ignoring messages
-            throw ActiveMQMessageBundle.BUNDLE.noAddress();
-         }
+      if (message.getAddress() == null) {
+         // This could happen with some tests that are ignoring messages
+         throw ActiveMQMessageBundle.BUNDLE.noAddress();
+      }
 
-         if (message.getAddressSimpleString().equals(managementAddress)) {
-            // It's a management message
+      if (message.getAddressSimpleString().equals(managementAddress)) {
+         // It's a management message
 
-            handleManagementMessage(tx, message, direct);
-         } else {
-            result = doSend(tx, message, address, direct, noAutoCreateQueue);
-         }
-         return result;
+         handleManagementMessage(tx, message, direct);
+      } else {
+         result = doSend(tx, message, address, direct, noAutoCreateQueue);
       }
+      return result;
    }
 
 
@@ -1615,16 +1611,15 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
    }
 
    @Override
-   public RoutingStatus doSend(final Transaction tx,
-                               final Message msg,
-                               final SimpleString originalAddress,
-                               final boolean direct,
-                               final boolean noAutoCreateQueue) throws 
Exception {
+   public synchronized RoutingStatus doSend(final Transaction tx,
+                                            final Message msg,
+                                            final SimpleString originalAddress,
+                                            final boolean direct,
+                                            final boolean noAutoCreateQueue) 
throws Exception {
 
-      synchronized (sendLock) {
-         RoutingStatus result = RoutingStatus.OK;
+      RoutingStatus result = RoutingStatus.OK;
 
-         RoutingType routingType = msg.getRouteType();
+      RoutingType routingType = msg.getRouteType();
 
          /* TODO-now: How to address here with AMQP?
          if (originalAddress != null) {
@@ -1635,43 +1630,42 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
             }
          } */
 
-         Pair<SimpleString, RoutingType> art = 
getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
+      Pair<SimpleString, RoutingType> art = 
getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
 
-         // Consumer
-         // check the user has write access to this address.
-         try {
-            securityCheck(art.getA(), CheckType.SEND, this);
-         } catch (ActiveMQException e) {
-            if (!autoCommitSends && tx != null) {
-               tx.markAsRollbackOnly(e);
-            }
-            throw e;
+      // Consumer
+      // check the user has write access to this address.
+      try {
+         securityCheck(art.getA(), CheckType.SEND, this);
+      } catch (ActiveMQException e) {
+         if (!autoCommitSends && tx != null) {
+            tx.markAsRollbackOnly(e);
          }
+         throw e;
+      }
 
-         if (tx == null || autoCommitSends) {
-         } else {
-            routingContext.setTransaction(tx);
-         }
+      if (tx == null || autoCommitSends) {
+      } else {
+         routingContext.setTransaction(tx);
+      }
 
-         try {
-            routingContext.setAddress(art.getA());
-            routingContext.setRoutingType(art.getB());
+      try {
+         routingContext.setAddress(art.getA());
+         routingContext.setRoutingType(art.getB());
 
-            result = postOffice.route(msg, routingContext, direct);
+         result = postOffice.route(msg, routingContext, direct);
 
-            Pair<Object, AtomicLong> value = 
targetAddressInfos.get(msg.getAddressSimpleString());
+         Pair<Object, AtomicLong> value = 
targetAddressInfos.get(msg.getAddressSimpleString());
 
-            if (value == null) {
-               targetAddressInfos.put(msg.getAddressSimpleString(), new 
Pair<>(msg.getUserID(), new AtomicLong(1)));
-            } else {
-               value.setA(msg.getUserID());
-               value.getB().incrementAndGet();
-            }
-         } finally {
-            routingContext.clear();
+         if (value == null) {
+            targetAddressInfos.put(msg.getAddressSimpleString(), new 
Pair<>(msg.getUserID(), new AtomicLong(1)));
+         } else {
+            value.setA(msg.getUserID());
+            value.getB().incrementAndGet();
          }
-         return result;
+      } finally {
+         routingContext.clear();
       }
+      return result;
    }
 
    @Override
@@ -1699,7 +1693,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public Pair<SimpleString, RoutingType> 
getAddressAndRoutingType(SimpleString address,
-                                                                    
RoutingType defaultRoutingType) {
+                                                                   RoutingType 
defaultRoutingType) {
       if (prefixEnabled) {
          return PrefixUtil.getAddressAndRoutingType(address, 
defaultRoutingType, prefixes);
       }
@@ -1708,7 +1702,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public Pair<SimpleString, Set<RoutingType>> 
getAddressAndRoutingTypes(SimpleString address,
-                                                                          
Set<RoutingType> defaultRoutingTypes) {
+                                                                         
Set<RoutingType> defaultRoutingTypes) {
       if (prefixEnabled) {
          return PrefixUtil.getAddressAndRoutingTypes(address, 
defaultRoutingTypes, prefixes);
       }

Reply via email to