Refactoring between Connection and protocol manager
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b0896b35 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b0896b35 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b0896b35 Branch: refs/heads/refactor-openwire Commit: b0896b353bf5ea60fba9f81b296791fd27535cf7 Parents: 4f39e04 Author: Clebert Suconic <[email protected]> Authored: Wed Feb 24 22:30:28 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 17 14:10:46 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 331 ++++++++++++++----- .../openwire/OpenWireProtocolManager.java | 325 +++--------------- .../core/protocol/openwire/amq/AMQConsumer.java | 20 +- .../openwire/amq/AMQProducerBrokerExchange.java | 96 ------ .../openwire/amq/AMQServerConsumer.java | 12 + .../core/protocol/openwire/amq/AMQSession.java | 21 +- .../artemis/core/server/ServerConsumer.java | 6 + .../server/SlowConsumerDetectionListener.java | 22 ++ .../artemis/core/server/impl/QueueImpl.java | 2 + .../core/server/impl/ServerConsumerImpl.java | 65 +--- 10 files changed, 369 insertions(+), 531 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 991f24b..6f2e3be 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -23,37 +23,45 @@ import javax.jms.ResourceAllocationException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; -import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; @@ -102,33 +110,32 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final OpenWireProtocolManager protocolManager; - private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>(); - private boolean destroyed = false; private final Object sendLock = new Object(); - private final Acceptor acceptorUsed; - private final OpenWireFormat wireFormat; private AMQConnectionContext context; - private Throwable stopError = null; - private final AtomicBoolean stopping = new AtomicBoolean(false); - private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); - - protected final List<Command> dispatchQueue = new LinkedList<>(); - private boolean inServiceException; private final AtomicBoolean asyncException = new AtomicBoolean(false); + // Clebert: Artemis session has meta-data support, perhaps we could reuse it here + private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>(); + + private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<>(); private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<>(); + // Clebert TODO: Artemis already stores the Session. Why do we need a different one here + private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>(); + + + private ConnectionState state; private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>(); @@ -139,14 +146,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private String defaultSocketURIString; - public OpenWireConnection(Acceptor acceptorUsed, - Connection connection, + public OpenWireConnection(Connection connection, Executor executor, OpenWireProtocolManager openWireProtocolManager, OpenWireFormat wf) { super(connection, executor); this.protocolManager = openWireProtocolManager; - this.acceptorUsed = acceptorUsed; this.wireFormat = wf; this.defaultSocketURIString = connection.getLocalAddress(); } @@ -322,8 +327,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - // throw a WireFormatInfo to the peer - public void init() { + // send a WireFormatInfo to the peer + public void sendHandshake() { WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); sendCommand(info); } @@ -590,7 +595,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); } try { - protocolManager.removeConnection(this, this.getConnectionInfo(), me); + protocolManager.removeConnection(this.getConnectionInfo(), me); } catch (InvalidClientIDException e) { ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e); @@ -681,40 +686,185 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se context.incRefCount(); } - /** This will answer with commands to the client */ + /** + * This will answer with commands to the client + */ public boolean sendCommand(final Command command) { if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { ActiveMQServerLogger.LOGGER.trace("sending " + command); } - synchronized (this) { - if (isDestroyed()) { - return false; + + if (isDestroyed()) { + return false; + } + + try { + physicalSend(command); + } + catch (Exception e) { + return false; + } + catch (Throwable t) { + return false; + } + return true; + } + + public void addDestination(DestinationInfo info) throws Exception { + ActiveMQDestination dest = info.getDestination(); + if (dest.isQueue()) { + SimpleString qName = OpenWireUtil.toCoreAddress(dest); + QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName); + if (binding == null) { + if (getState().getInfo() != null) { + + CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; + protocolManager.getServer().getSecurityStore().check(qName, checkType, this); + + protocolManager.getServer().checkQueueCreationLimit(getUsername()); + } + ConnectionInfo connInfo = getState().getInfo(); + protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); } - try { - physicalSend(command); + if (dest.isTemporary()) { + registerTempQueue(dest); } - catch (Exception e) { - return false; + } + + if (!AdvisorySupport.isAdvisoryTopic(dest)) { + AMQConnectionContext context = getContext(); + DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest); + + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest); + protocolManager.fireAdvisory(context, topic, advInfo); + } + } + + + public void updateConsumer(ConsumerControl consumerControl) { + SessionId sessionId = consumerControl.getConsumerId().getParentId(); + AMQSession amqSession = sessions.get(sessionId); + amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch()); + } + + public void addConsumer(ConsumerInfo info) throws Exception { + // Todo: add a destination interceptors holder here (amq supports this) + SessionId sessionId = info.getConsumerId().getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + ConnectionState cs = getState(); + if (cs == null) { + throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId); + } + SessionState ss = cs.getSessionState(sessionId); + if (ss == null) { + throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId); + } + // Avoid replaying dup commands + if (!ss.getConsumerIds().contains(info.getConsumerId())) { + + AMQSession amqSession = sessions.get(sessionId); + if (amqSession == null) { + throw new IllegalStateException("Session not exist! : " + sessionId); + } + + amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); + + ss.addConsumer(info); + } + } + + class SlowConsumerDetection implements SlowConsumerDetectionListener { + + @Override + public void onSlowConsumer(ServerConsumer consumer) { + if (consumer instanceof AMQServerConsumer) { + AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer; + ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getDestination()); + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + try { + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString()); + protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId()); + } + catch (Exception e) { + // TODO-NOW: LOGGING + e.printStackTrace(); + } } - catch (Throwable t) { - return false; + } + } + + public void addSessions(Set<SessionId> sessionSet) { + Iterator<SessionId> iter = sessionSet.iterator(); + while (iter.hasNext()) { + SessionId sid = iter.next(); + addSession(getState().getSessionState(sid).getInfo(), true); + } + } + + public AMQSession addSession(SessionInfo ss) { + return addSession(ss, false); + } + + public AMQSession addSession(SessionInfo ss, boolean internal) { + AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager); + amqSession.initialize(); + amqSession.setInternal(internal); + sessions.put(ss.getSessionId(), amqSession); + sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId()); + return amqSession; + } + + public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception { + AMQSession session = sessions.remove(info.getSessionId()); + if (session != null) { + session.close(); + } + } + + public AMQSession getSession(SessionId sessionId) { + return sessions.get(sessionId); + } + + public void removeDestination(ActiveMQDestination dest) throws Exception { + if (dest.isQueue()) { + SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); + protocolManager.getServer().destroyQueue(qName); + } + else { + Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName())); + Iterator<Binding> iterator = bindings.getBindings().iterator(); + + while (iterator.hasNext()) { + Queue b = (Queue) iterator.next().getBindable(); + if (b.getConsumerCount() > 0) { + throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName()); + } + if (b.isDurable()) { + throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName()); + } + b.deleteQueue(); } - return true; + } + + if (!AdvisorySupport.isAdvisoryTopic(dest)) { + AMQConnectionContext context = getContext(); + DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest); + + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest); + protocolManager.fireAdvisory(context, topic, advInfo); } } // This will listen for commands throught the protocolmanager public class CommandProcessor implements CommandVisitor { - public AMQConnectionContext getContext() { return OpenWireConnection.this.getContext(); } @Override public Response processAddConnection(ConnectionInfo info) throws Exception { - //let protoclmanager handle connection add/remove try { protocolManager.addConnection(OpenWireConnection.this, info); } @@ -739,7 +889,36 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processAddProducer(ProducerInfo info) throws Exception { Response resp = null; try { - protocolManager.addProducer(OpenWireConnection.this, info); + SessionId sessionId = info.getProducerId().getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + ConnectionState cs = getState(); + if (cs == null) { + throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId); + } + SessionState ss = cs.getSessionState(sessionId); + if (ss == null) { + throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId); + } + // Avoid replaying dup commands + if (!ss.getProducerIds().contains(info.getProducerId())) { + + AMQSession amqSession = sessions.get(sessionId); + if (amqSession == null) { + throw new IllegalStateException("Session not exist! : " + sessionId); + } + + ActiveMQDestination destination = info.getDestination(); + if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { + if (destination.isQueue()) { + OpenWireUtil.validateDestination(destination, amqSession); + } + DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); + OpenWireConnection.this.addDestination(destInfo); + } + + ss.addProducer(info); + + } } catch (Exception e) { if (e instanceof ActiveMQSecurityException) { @@ -759,7 +938,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processAddConsumer(ConsumerInfo info) { Response resp = null; try { - protocolManager.addConsumer(OpenWireConnection.this, info); + addConsumer(info); } catch (Exception e) { e.printStackTrace(); @@ -776,13 +955,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveDestination(DestinationInfo info) throws Exception { ActiveMQDestination dest = info.getDestination(); - protocolManager.removeDestination(OpenWireConnection.this, dest); + removeDestination(dest); return null; } @Override public Response processRemoveProducer(ProducerId id) throws Exception { - protocolManager.removeProducer(id); + + // TODO-now: proper implement this method return null; } @@ -807,7 +987,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } state.removeSession(id); - protocolManager.removeSession(context, session.getInfo()); + removeSession(context, session.getInfo()); return null; } @@ -843,7 +1023,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processAddDestination(DestinationInfo dest) throws Exception { Response resp = null; try { - protocolManager.addDestination(OpenWireConnection.this, dest); + addDestination(dest); } catch (Exception e) { if (e instanceof ActiveMQSecurityException) { @@ -860,14 +1040,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processAddSession(SessionInfo info) throws Exception { // Avoid replaying dup commands if (!state.getSessionIds().contains(info.getSessionId())) { - protocolManager.addSession(OpenWireConnection.this, info); - try { - state.addSession(info); - } - catch (IllegalStateException e) { - e.printStackTrace(); - protocolManager.removeSession(context, info); - } + addSession(info); + state.addSession(info); } return null; } @@ -923,7 +1097,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se //amq5 clients send this command to restore prefetchSize //after successful reconnect try { - protocolManager.updateConsumer(OpenWireConnection.this, consumerControl); + updateConsumer(consumerControl); } catch (Exception e) { //log error @@ -976,33 +1150,31 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode(); - AMQSession session = protocolManager.getSession(producerId.getParentId()); + AMQSession session = getSession(producerId.getParentId()); - // TODO: canDispatch is always returning true; - if (producerExchange.canDispatch(messageSend)) { - SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); - if (result.isBlockNextSend()) { - if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { - // TODO see logging - throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); - } + SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); + if (result.isBlockNextSend()) { + if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { + // TODO see logging + throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } - if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { - //in that case don't send the response - //this will force the client to wait until - //the response is got. - context.setDontSendReponse(true); - } - else { - //hang the connection until the space is available - session.blockingWaitForSpace(producerExchange, result); - } + if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { + //in that case don't send the response + //this will force the client to wait until + //the response is got. + context.setDontSendReponse(true); } - else if (sendProducerAck) { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - OpenWireConnection.this.dispatchAsync(ack); + else { + //hang the connection until the space is available + session.blockingWaitForSpace(producerExchange, result); } } + else if (sendProducerAck) { + // TODO-now: send through OperationContext + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + OpenWireConnection.this.dispatchAsync(ack); + } } catch (Throwable e) { if (e instanceof ActiveMQSecurityException) { @@ -1056,15 +1228,26 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRecoverTransactions(TransactionInfo info) throws Exception { Set<SessionId> sIds = state.getSessionIds(); - TransactionId[] recovered = protocolManager.recoverTransactions(sIds); - return new DataArrayResponse(recovered); + + + List<TransactionId> recovered = new ArrayList<>(); + if (sIds != null) { + for (SessionId sid : sIds) { + AMQSession s = sessions.get(sid); + if (s != null) { + s.recover(recovered); + } + } + } + + return new DataArrayResponse(recovered.toArray(new TransactionId[0])); } @Override public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { //we let protocol manager to handle connection add/remove try { - protocolManager.removeConnection(OpenWireConnection.this, state.getInfo(), null); + protocolManager.removeConnection(state.getInfo(), null); } catch (Throwable e) { // log http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index add1455..bdf27f8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -17,15 +17,12 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.InvalidClientIDException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -40,26 +37,14 @@ import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; -import org.apache.activemq.artemis.api.core.management.CoreNotificationType; -import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.management.ManagementService; -import org.apache.activemq.artemis.core.server.management.Notification; -import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -69,7 +54,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; @@ -78,31 +62,24 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; -import org.apache.activemq.command.SessionId; -import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ProducerState; -import org.apache.activemq.state.SessionState; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.LongSequenceGenerator; -public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener, ClusterTopologyListener { +public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener { private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator(); @@ -127,21 +104,16 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>(); protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>(); + // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available on Artemis upstream (unique-client-id) private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>(); private String brokerName; - // Clebert TODO: Artemis already stores the Session. Why do we need a different one here - private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>(); - // Clebert: Artemis already has a Resource Manager. Need to remove this.. // The TransactionID extends XATransactionID, so all we need is to convert the XID here private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>(); - // Clebert: Artemis session has meta-data support, perhaps we could reuse it here - private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>(); - private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>(); private final LinkedList<TopologyMember> members = new LinkedList<>(); @@ -163,9 +135,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); ManagementService service = server.getManagementService(); scheduledPool = server.getScheduledPool(); - if (service != null) { - service.addNotificationListener(this); - } final ClusterManager clusterManager = this.server.getClusterManager(); ClusterConnection cc = clusterManager.getDefaultConnection(null); @@ -187,6 +156,35 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No } } + + public void removeConnection(ConnectionInfo info, + Throwable error) throws InvalidClientIDException { + synchronized (clientIdSet) { + String clientId = info.getClientId(); + if (clientId != null) { + AMQConnectionContext context = this.clientIdSet.get(clientId); + if (context != null && context.decRefCount() == 0) { + //connection is still there and need to close + context.getConnection().disconnect(error != null); + this.connections.remove(this);//what's that for? + this.clientIdSet.remove(clientId); + } + } + else { + throw new InvalidClientIDException("No clientID specified for connection disconnect request"); + } + } + } + + + public ScheduledExecutorService getScheduledPool() { + return scheduledPool; + } + + public ActiveMQServer getServer() { + return server; + } + private void updateClientClusterInfo() { synchronized (members) { @@ -219,8 +217,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); - OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, server.getExecutorFactory().getExecutor(), this, wf); - owConn.init(); + OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf); + owConn.sendHandshake(); // TODO CLEBERT What is this constant here? we should get it from TTL initial pings return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000); @@ -233,7 +231,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No @Override public void removeHandler(String name) { - // TODO Auto-generated method stub } @Override @@ -276,8 +273,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No @Override public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { - // TODO Auto-generated method stub - } public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception { @@ -322,11 +317,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No fireAdvisory(context, topic, copy); // init the conn - addSessions(context.getConnection(), context.getConnectionState().getSessionIds()); + context.getConnection().addSessions( context.getConnectionState().getSessionIds()); } } - private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception { + public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception { this.fireAdvisory(context, topic, copy, null); } @@ -341,7 +336,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No /* * See AdvisoryBroker.fireAdvisory() */ - private void fireAdvisory(AMQConnectionContext context, + public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { @@ -448,198 +443,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No public boolean isStopping() { return false; } - - public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception { - SessionId sessionId = info.getProducerId().getParentId(); - ConnectionId connectionId = sessionId.getParentId(); - ConnectionState cs = theConn.getState(); - if (cs == null) { - throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId); - } - SessionState ss = cs.getSessionState(sessionId); - if (ss == null) { - throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId); - } - // Avoid replaying dup commands - if (!ss.getProducerIds().contains(info.getProducerId())) { - - AMQSession amqSession = sessions.get(sessionId); - if (amqSession == null) { - throw new IllegalStateException("Session not exist! : " + sessionId); - } - - ActiveMQDestination destination = info.getDestination(); - if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (destination.isQueue()) { - OpenWireUtil.validateDestination(destination, amqSession); - } - DestinationInfo destInfo = new DestinationInfo(theConn.getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); - this.addDestination(theConn, destInfo); - } - - amqSession.createProducer(info); - - try { - ss.addProducer(info); - } - catch (IllegalStateException e) { - amqSession.removeProducer(info); - } - - } - - } - - public void updateConsumer(OpenWireConnection theConn, ConsumerControl consumerControl) { - SessionId sessionId = consumerControl.getConsumerId().getParentId(); - AMQSession amqSession = sessions.get(sessionId); - amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch()); - } - - public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception { - // Todo: add a destination interceptors holder here (amq supports this) - SessionId sessionId = info.getConsumerId().getParentId(); - ConnectionId connectionId = sessionId.getParentId(); - ConnectionState cs = theConn.getState(); - if (cs == null) { - throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId); - } - SessionState ss = cs.getSessionState(sessionId); - if (ss == null) { - throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId); - } - // Avoid replaying dup commands - if (!ss.getConsumerIds().contains(info.getConsumerId())) { - - AMQSession amqSession = sessions.get(sessionId); - if (amqSession == null) { - throw new IllegalStateException("Session not exist! : " + sessionId); - } - - amqSession.createConsumer(info, amqSession); - - ss.addConsumer(info); - } - } - - public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) { - Iterator<SessionId> iter = sessionSet.iterator(); - while (iter.hasNext()) { - SessionId sid = iter.next(); - addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true); - } - } - - public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) { - return addSession(theConn, ss, false); - } - - public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) { - AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, server, theConn, scheduledPool, this); - amqSession.initialize(); - amqSession.setInternal(internal); - sessions.put(ss.getSessionId(), amqSession); - sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId()); - return amqSession; - } - - public void removeConnection(OpenWireConnection connection, - ConnectionInfo info, - Throwable error) throws InvalidClientIDException { - synchronized (clientIdSet) { - String clientId = info.getClientId(); - if (clientId != null) { - AMQConnectionContext context = this.clientIdSet.get(clientId); - if (context != null && context.decRefCount() == 0) { - //connection is still there and need to close - this.clientIdSet.remove(clientId); - connection.disconnect(error != null); - this.connections.remove(connection);//what's that for? - } - } - else { - throw new InvalidClientIDException("No clientID specified for connection disconnect request"); - } - } - } - - public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception { - AMQSession session = sessions.remove(info.getSessionId()); - if (session != null) { - session.close(); - } - } - - public void removeProducer(ProducerId id) { - SessionId sessionId = id.getParentId(); - AMQSession session = sessions.get(sessionId); - session.removeProducer(id); - } - - public AMQSession getSession(SessionId sessionId) { - return sessions.get(sessionId); - } - - public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception { - if (dest.isQueue()) { - SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); - this.server.destroyQueue(qName); - } - else { - Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName())); - Iterator<Binding> iterator = bindings.getBindings().iterator(); - - while (iterator.hasNext()) { - Queue b = (Queue) iterator.next().getBindable(); - if (b.getConsumerCount() > 0) { - throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName()); - } - if (b.isDurable()) { - throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName()); - } - b.deleteQueue(); - } - } - - if (!AdvisorySupport.isAdvisoryTopic(dest)) { - AMQConnectionContext context = connection.getContext(); - DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest); - - ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest); - fireAdvisory(context, topic, advInfo); - } - } - - public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception { - ActiveMQDestination dest = info.getDestination(); - if (dest.isQueue()) { - SimpleString qName = OpenWireUtil.toCoreAddress(dest); - QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); - if (binding == null) { - if (connection.getState().getInfo() != null) { - - CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; - server.getSecurityStore().check(qName, checkType, connection); - - server.checkQueueCreationLimit(connection.getUsername()); - } - ConnectionInfo connInfo = connection.getState().getInfo(); - this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); - } - if (dest.isTemporary()) { - connection.registerTempQueue(dest); - } - } - - if (!AdvisorySupport.isAdvisoryTopic(dest)) { - AMQConnectionContext context = connection.getContext(); - DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest); - - ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest); - fireAdvisory(context, topic, advInfo); - } - } - public void endTransaction(TransactionInfo info) throws Exception { AMQSession txSession = transactions.get(info.getTransactionId()); @@ -682,19 +485,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No transactions.remove(info.getTransactionId()); } - public TransactionId[] recoverTransactions(Set<SessionId> sIds) { - List<TransactionId> recovered = new ArrayList<>(); - if (sIds != null) { - for (SessionId sid : sIds) { - AMQSession s = this.sessions.get(sid); - if (s != null) { - s.recover(recovered); - } - } - } - return recovered.toArray(new TransactionId[0]); - } - public boolean validateUser(String login, String passcode) { boolean validated = true; @@ -717,50 +507,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No /** * TODO: remove this, use the regular ResourceManager from the Server's - * */ + */ public void registerTx(TransactionId txId, AMQSession amqSession) { transactions.put(txId, amqSession); } - //advisory support - @Override - public void onNotification(Notification notif) { - try { - if (notif.getType() instanceof CoreNotificationType) { - CoreNotificationType type = (CoreNotificationType) notif.getType(); - switch (type) { - case CONSUMER_SLOW: - fireSlowConsumer(notif); - break; - default: - break; - } - } - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e); - } - } - - private void fireSlowConsumer(Notification notif) throws Exception { - SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME); - Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME); - SessionId sessionId = sessionIdMap.get(coreSessionId.toString()); - AMQSession session = sessions.get(sessionId); - AMQConsumer consumer = session.getConsumer(coreConsumerId); - ActiveMQDestination destination = consumer.getDestination(); - - if (!AdvisorySupport.isAdvisoryTopic(destination)) { - ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination); - ConnectionId connId = sessionId.getParentId(); - OpenWireConnection cc = this.brokerConnectionStates.get(connId); - ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString()); - - fireAdvisory(cc.getContext(), topic, advisoryMessage, consumer.getId()); - } - } - public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception { SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); server.destroyQueue(subQueueName); @@ -795,7 +546,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No return this.updateClusterClients; } - public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { + public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index b0f007a..221679f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -27,7 +27,14 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; +import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; @@ -36,14 +43,9 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.TransactionId; import org.apache.activemq.wireformat.WireFormat; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; -import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; public class AMQConsumer implements BrowserListener { + private AMQSession session; private org.apache.activemq.command.ActiveMQDestination actualDest; private ConsumerInfo info; @@ -72,7 +74,7 @@ public class AMQConsumer implements BrowserListener { } } - public void init() throws Exception { + public void init(SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { AMQServerSession coreSession = session.getCoreSession(); SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); @@ -127,7 +129,9 @@ public class AMQConsumer implements BrowserListener { coreSession.createQueue(address, subQueueName, selector, true, false); } - coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1); + AMQServerConsumer serverConsumer = (AMQServerConsumer) coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1); + serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); + serverConsumer.setAmqConsumer(this); } else { SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java index e9c4044..b5d8dbd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; import org.apache.activemq.state.ProducerState; public class AMQProducerBrokerExchange { @@ -28,7 +26,6 @@ public class AMQProducerBrokerExchange { private AMQConnectionContext connectionContext; private ProducerState producerState; private boolean mutable = true; - private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); private final FlowControlInfo flowControlInfo = new FlowControlInfo(); public AMQProducerBrokerExchange() { @@ -57,13 +54,6 @@ public class AMQProducerBrokerExchange { } /** - * @return the mutable - */ - public boolean isMutable() { - return this.mutable; - } - - /** * @param mutable the mutable to set */ public void setMutable(boolean mutable) { @@ -84,75 +74,13 @@ public class AMQProducerBrokerExchange { this.producerState = producerState; } - /** - * Enforce duplicate suppression using info from persistence adapter - * - * @return false if message should be ignored as a duplicate - */ - public boolean canDispatch(Message messageSend) { - // TODO: auditProduceSequenceIds is never true - boolean canDispatch = true; - //TODO: DEAD CODE -// if (auditProducerSequenceIds && messageSend.isPersistent()) { -// final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId(); -// if (isNetworkProducer) { -// // messages are multiplexed on this producer so we need to query the -// // persistenceAdapter -// long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); -// if (producerSequenceId <= lastStoredForMessageProducer) { -// canDispatch = false; -// } -// } -// else if (producerSequenceId <= lastSendSequenceNumber.get()) { -// canDispatch = false; -// // TODO: WHAT IS THIS? -// if (messageSend.isInTransaction()) { -// -// -// } -// else { -// } -// } -// else { -// // track current so we can suppress duplicates later in the stream -// lastSendSequenceNumber.set(producerSequenceId); -// } -// } - return canDispatch; - } - - private long getStoredSequenceIdForMessage(MessageId messageId) { - return -1; - } - public void setLastStoredSequenceId(long l) { } - public void incrementSend() { - flowControlInfo.incrementSend(); - } - public void blockingOnFlowControl(boolean blockingOnFlowControl) { flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl); } - public boolean isBlockedForFlowControl() { - return flowControlInfo.isBlockingOnFlowControl(); - } - - public void resetFlowControl() { - flowControlInfo.reset(); - } - - public long getTotalTimeBlocked() { - return flowControlInfo.getTotalTimeBlocked(); - } - - public int getPercentageBlocked() { - double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends(); - return (int) value * 100; - } - public static class FlowControlInfo { private AtomicBoolean blockingOnFlowControl = new AtomicBoolean(); @@ -160,10 +88,6 @@ public class AMQProducerBrokerExchange { private AtomicLong sendsBlocked = new AtomicLong(); private AtomicLong totalTimeBlocked = new AtomicLong(); - public boolean isBlockingOnFlowControl() { - return blockingOnFlowControl.get(); - } - public void setBlockingOnFlowControl(boolean blockingOnFlowControl) { this.blockingOnFlowControl.set(blockingOnFlowControl); if (blockingOnFlowControl) { @@ -171,30 +95,10 @@ public class AMQProducerBrokerExchange { } } - public long getTotalSends() { - return totalSends.get(); - } - - public void incrementSend() { - this.totalSends.incrementAndGet(); - } - - public long getSendsBlocked() { - return sendsBlocked.get(); - } - public void incrementSendBlocked() { this.sendsBlocked.incrementAndGet(); } - public long getTotalTimeBlocked() { - return totalTimeBlocked.get(); - } - - public void incrementTimeBlocked(long time) { - this.totalTimeBlocked.addAndGet(time); - } - public void reset() { blockingOnFlowControl.set(false); totalSends.set(0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index 3e7afa5..f198cb7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -34,6 +34,18 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; public class AMQServerConsumer extends ServerConsumerImpl { + // TODO-NOW: remove this once unified + AMQConsumer amqConsumer; + + public AMQConsumer getAmqConsumer() { + return amqConsumer; + } + + /** TODO-NOW: remove this once unified */ + public void setAmqConsumer(AMQConsumer amqConsumer) { + this.amqConsumer = amqConsumer; + } + public AMQServerConsumer(long consumerID, AMQServerSession serverSession, QueueBinding binding, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 0cee3d3..d16d4c8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; @@ -71,8 +72,6 @@ public class AMQSession implements SessionCallback { private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>(); - private Map<Long, AMQProducer> producers = new HashMap<>(); - private AtomicBoolean started = new AtomicBoolean(false); private TransactionId txId = null; @@ -121,7 +120,7 @@ public class AMQSession implements SessionCallback { } - public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception { + public void createConsumer(ConsumerInfo info, AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); ActiveMQDestination[] dests = null; @@ -139,7 +138,7 @@ public class AMQSession implements SessionCallback { } AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool); - consumer.init(); + consumer.init(slowConsumerDetectionListener); consumerMap.put(d, consumer); consumers.put(consumer.getNativeId(), consumer); } @@ -233,20 +232,6 @@ public class AMQSession implements SessionCallback { consumers.remove(consumerId); } - public void createProducer(ProducerInfo info) throws Exception { - AMQProducer producer = new AMQProducer(this, info); - producer.init(); - producers.put(info.getProducerId().getValue(), producer); - } - - public void removeProducer(ProducerInfo info) { - removeProducer(info.getProducerId()); - } - - public void removeProducer(ProducerId id) { - producers.remove(id.getValue()); - } - public SendingResult send(AMQProducerBrokerExchange producerExchange, Message messageSend, boolean sendProducerAck) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index 6045e2c..d75efdd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -25,6 +25,12 @@ import org.apache.activemq.artemis.core.transaction.Transaction; */ public interface ServerConsumer extends Consumer { + void setlowConsumerDetection(SlowConsumerDetectionListener listener); + + SlowConsumerDetectionListener getSlowConsumerDetecion(); + + void fireSlowConsumer(); + /** * @param protocolContext * @see #getProtocolContext() http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java new file mode 100644 index 0000000..0c60f25 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server; + +public interface SlowConsumerDetectionListener { + void onSlowConsumer(ServerConsumer consumer); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 8bf5d08..86ca36c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2930,6 +2930,8 @@ public class QueueImpl implements Queue { } } + serverConsumer.fireSlowConsumer(); + if (connection != null) { ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); if (policy.equals(SlowConsumerPolicy.KILL)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 422d324..545b4dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -88,8 +89,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private Object protocolContext; - private final ActiveMQServer server; - /** * We get a readLock when a message is handled, and return the readLock when the message is finally delivered * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished @@ -152,9 +151,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final SessionCallback callback, final boolean preAcknowledge, final boolean strictUpdateDeliveryCount, - final ManagementService managementService, - final ActiveMQServer server) throws Exception { - this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null, server); + final ManagementService managementService) throws Exception { + this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null); } public ServerConsumerImpl(final long id, @@ -169,8 +167,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final boolean strictUpdateDeliveryCount, final ManagementService managementService, final boolean supportLargeMessage, - final Integer credits, - final ActiveMQServer server) throws Exception { + final Integer credits) throws Exception { this.id = id; this.filter = filter; @@ -215,8 +212,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { availableCredits.set(credits); } } - - this.server = server; } @Override @@ -386,9 +381,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } finally { lockDelivery.readLock().unlock(); - callback.afterDelivery(); } - } @Override @@ -569,19 +562,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { synchronized (lock) { - boolean locked = lockDelivery(); - - // This is to make sure nothing would sneak to the client while started = false - // the client will stop the session and perform a rollback in certain cases. - // in case something sneaks to the client you could get to messaging delivering forever until - // you restart the server + lockDelivery.writeLock().lock(); try { this.started = browseOnly || started; } finally { - if (locked) { - lockDelivery.writeLock().unlock(); - } + lockDelivery.writeLock().unlock(); } } @@ -591,39 +577,22 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } - private boolean lockDelivery() { - try { - if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) { - ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); - if (server != null) { - server.threadDump(); - } - return false; - } - return true; - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - return false; - } - } - @Override public void setTransferring(final boolean transferring) { synchronized (lock) { - // This is to make sure that the delivery process has finished any pending delivery - // otherwise a message may sneak in on the client while we are trying to stop the consumer - boolean locked = lockDelivery(); - try { - this.transferring = transferring; - } - finally { - if (locked) { - lockDelivery.writeLock().unlock(); - } - } + this.transferring = transferring; } + // This is to make sure that the delivery process has finished any pending delivery + // otherwise a message may sneak in on the client while we are trying to stop the consumer + try { + lockDelivery.writeLock().lock(); + } + finally { + lockDelivery.writeLock().unlock(); + } + + // Outside the lock if (transferring) { // And we must wait for any force delivery to be executed - this is executed async so we add a future to the
