Repository: activemq-artemis Updated Branches: refs/heads/master bfa679c17 -> ace43c8ff
ARTEMIS-1051 using ServerSession's own lock Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e2b2e247 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e2b2e247 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e2b2e247 Branch: refs/heads/master Commit: e2b2e247d9925d4e0ff033ff63b7ef18b4e4a4f2 Parents: 8394fec Author: Clebert Suconic <[email protected]> Authored: Mon Mar 20 21:56:07 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue Mar 21 09:14:24 2017 -0400 ---------------------------------------------------------------------- .../core/server/impl/ServerSessionImpl.java | 162 +++++++++---------- 1 file changed, 78 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e2b2e247/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index af1c532..97a4249 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -182,8 +182,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private Set<Closeable> closeables; - private final Object sendLock = new Object(); - public ServerSessionImpl(final String name, final String username, final String password, @@ -1287,61 +1285,59 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public RoutingStatus send(Transaction tx, - final Message message, - final boolean direct, - boolean noAutoCreateQueue) throws Exception { + public synchronized RoutingStatus send(Transaction tx, + final Message message, + final boolean direct, + boolean noAutoCreateQueue) throws Exception { - synchronized (sendLock) { - // If the protocol doesn't support flow control, we have no choice other than fail the communication - if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { - ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); - this.getRemotingConnection().fail(exception); - throw exception; - } + // If the protocol doesn't support flow control, we have no choice other than fail the communication + if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { + ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); + this.getRemotingConnection().fail(exception); + throw exception; + } - RoutingStatus result = RoutingStatus.OK; - //large message may come from StompSession directly, in which - //case the id header already generated. - if (!message.isLargeMessage()) { - long id = storageManager.generateID(); - // This will re-encode the message - message.setMessageID(id); - } + RoutingStatus result = RoutingStatus.OK; + //large message may come from StompSession directly, in which + //case the id header already generated. + if (!message.isLargeMessage()) { + long id = storageManager.generateID(); + // This will re-encode the message + message.setMessageID(id); + } - if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { - message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); - } + if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { + message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); + } - SimpleString address = message.getAddressSimpleString(); + SimpleString address = message.getAddressSimpleString(); - if (defaultAddress == null && address != null) { - defaultAddress = address; - } + if (defaultAddress == null && address != null) { + defaultAddress = address; + } - if (address == null) { - // We don't want to force a re-encode when the message gets sent to the consumer - message.setAddress(defaultAddress); - } + if (address == null) { + // We don't want to force a re-encode when the message gets sent to the consumer + message.setAddress(defaultAddress); + } - if (logger.isTraceEnabled()) { - logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); - } + if (logger.isTraceEnabled()) { + logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); + } - if (message.getAddress() == null) { - // This could happen with some tests that are ignoring messages - throw ActiveMQMessageBundle.BUNDLE.noAddress(); - } + if (message.getAddress() == null) { + // This could happen with some tests that are ignoring messages + throw ActiveMQMessageBundle.BUNDLE.noAddress(); + } - if (message.getAddressSimpleString().equals(managementAddress)) { - // It's a management message + if (message.getAddressSimpleString().equals(managementAddress)) { + // It's a management message - handleManagementMessage(tx, message, direct); - } else { - result = doSend(tx, message, address, direct, noAutoCreateQueue); - } - return result; + handleManagementMessage(tx, message, direct); + } else { + result = doSend(tx, message, address, direct, noAutoCreateQueue); } + return result; } @@ -1615,16 +1611,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public RoutingStatus doSend(final Transaction tx, - final Message msg, - final SimpleString originalAddress, - final boolean direct, - final boolean noAutoCreateQueue) throws Exception { + public synchronized RoutingStatus doSend(final Transaction tx, + final Message msg, + final SimpleString originalAddress, + final boolean direct, + final boolean noAutoCreateQueue) throws Exception { - synchronized (sendLock) { - RoutingStatus result = RoutingStatus.OK; + RoutingStatus result = RoutingStatus.OK; - RoutingType routingType = msg.getRouteType(); + RoutingType routingType = msg.getRouteType(); /* TODO-now: How to address here with AMQP? if (originalAddress != null) { @@ -1635,43 +1630,42 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } */ - Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); + Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); - // Consumer - // check the user has write access to this address. - try { - securityCheck(art.getA(), CheckType.SEND, this); - } catch (ActiveMQException e) { - if (!autoCommitSends && tx != null) { - tx.markAsRollbackOnly(e); - } - throw e; + // Consumer + // check the user has write access to this address. + try { + securityCheck(art.getA(), CheckType.SEND, this); + } catch (ActiveMQException e) { + if (!autoCommitSends && tx != null) { + tx.markAsRollbackOnly(e); } + throw e; + } - if (tx == null || autoCommitSends) { - } else { - routingContext.setTransaction(tx); - } + if (tx == null || autoCommitSends) { + } else { + routingContext.setTransaction(tx); + } - try { - routingContext.setAddress(art.getA()); - routingContext.setRoutingType(art.getB()); + try { + routingContext.setAddress(art.getA()); + routingContext.setRoutingType(art.getB()); - result = postOffice.route(msg, routingContext, direct); + result = postOffice.route(msg, routingContext, direct); - Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString()); + Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString()); - if (value == null) { - targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); - } else { - value.setA(msg.getUserID()); - value.getB().incrementAndGet(); - } - } finally { - routingContext.clear(); + if (value == null) { + targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); + } else { + value.setA(msg.getUserID()); + value.getB().incrementAndGet(); } - return result; + } finally { + routingContext.clear(); } + return result; } @Override @@ -1699,7 +1693,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, - RoutingType defaultRoutingType) { + RoutingType defaultRoutingType) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes); } @@ -1708,7 +1702,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address, - Set<RoutingType> defaultRoutingTypes) { + Set<RoutingType> defaultRoutingTypes) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes); }
