Some tweaks to the code
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e43a561 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e43a561 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e43a561 Branch: refs/heads/refactor-openwire Commit: 0e43a5615814991d163e6cb640c834aaca2a2180 Parents: 1ca8967 Author: Clebert Suconic <[email protected]> Authored: Fri Mar 18 14:30:52 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Sat Mar 19 01:07:37 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 17 +-- .../openwire/OpenWireMessageConverter.java | 2 +- .../amq/AMQCompositeConsumerBrokerExchange.java | 9 +- .../core/protocol/openwire/amq/AMQConsumer.java | 144 ++++++++++--------- .../core/protocol/openwire/amq/AMQSession.java | 45 +++--- 5 files changed, 125 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e43a561/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 17f26b0..e8259c3 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -426,16 +426,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - public void addConsumerBrokerExchange(ConsumerId id, + private void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, - Map<ActiveMQDestination, AMQConsumer> consumerMap) { + List<AMQConsumer> consumerList) { AMQConsumerBrokerExchange result = consumerExchanges.get(id); if (result == null) { - if (consumerMap.size() == 1) { - result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next()); + if (consumerList.size() == 1) { + result = new AMQSingleConsumerBrokerExchange(amqSession, consumerList.get(0)); } else { - result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap); + result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerList); } synchronized (consumerExchanges) { consumerExchanges.put(id, result); @@ -717,9 +717,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se throw new IllegalStateException("Session not exist! : " + sessionId); } - amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); + List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); + this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); ss.addConsumer(info); + amqSession.start(); } } @@ -729,7 +731,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void onSlowConsumer(ServerConsumer consumer) { if (consumer instanceof AMQServerConsumer) { AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer; - ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getDestination()); + ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination()); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); try { advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString()); @@ -1002,7 +1004,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { - new Exception("commit").printStackTrace(); try { protocolManager.commitTransactionOnePhase(info); TransactionId txId = info.getTransactionId(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e43a561/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 6176490..89f71ed 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -443,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter { public static MessageDispatch createMessageDispatch(ServerMessage message, int deliveryCount, AMQConsumer consumer) throws IOException, JMSException { - ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination()); + ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e43a561/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java index 7e83767..56b4b6d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java @@ -20,15 +20,20 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessagePull; +import java.util.HashMap; +import java.util.List; import java.util.Map; public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange { private final Map<ActiveMQDestination, AMQConsumer> consumerMap; - public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap) { + public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, List<AMQConsumer> consumerList) { super(amqSession); - this.consumerMap = consumerMap; + this.consumerMap = new HashMap<>(); + for (AMQConsumer consumer : consumerList) { + consumerMap.put(consumer.getOpenwireDestination(), consumer); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e43a561/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index d296213..b4056fb 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -49,11 +48,10 @@ import org.apache.activemq.wireformat.WireFormat; public class AMQConsumer { private AMQSession session; - private org.apache.activemq.command.ActiveMQDestination actualDest; + private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; private final ScheduledExecutorService scheduledPool; private long nativeId = -1; - private SimpleString subQueueName = null; private int prefetchSize; private AtomicInteger windowAvailable; @@ -66,7 +64,7 @@ public class AMQConsumer { ConsumerInfo info, ScheduledExecutorService scheduledPool) { this.session = amqSession; - this.actualDest = d; + this.openwireDestination = d; this.info = info; this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); @@ -76,73 +74,38 @@ public class AMQConsumer { } } - public void init(SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { - AMQServerSession coreSession = session.getCoreSession(); - - SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); - - nativeId = session.getCoreServer().getStorageManager().generateID(); - - SimpleString address = new SimpleString(this.actualDest.getPhysicalName()); - - if (this.actualDest.isTopic()) { - String physicalName = this.actualDest.getPhysicalName(); - if (physicalName.contains(".>")) { - //wildcard - physicalName = OpenWireUtil.convertWildcard(physicalName); - } - - // on recreate we don't need to create queues - address = new SimpleString("jms.topic." + physicalName); - if (info.isDurable()) { - subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, info.getClientId(), info.getSubscriptionName())); - - QueueQueryResult result = coreSession.executeQueueQuery(subQueueName); - if (result.isExists()) { - // Already exists - if (result.getConsumerCount() > 0) { - throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); - } + public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { + this.nativeId = nativeId; + AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener); + serverConsumer.setAmqConsumer(this); + } - SimpleString oldFilterString = result.getFilterString(); - boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector); + private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { - SimpleString oldTopicName = result.getAddress(); + SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); - boolean topicChanged = !oldTopicName.equals(address); + String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName()); - if (selectorChanged || topicChanged) { - // Delete the old durable sub - coreSession.deleteQueue(subQueueName); + SimpleString address; - // Create the new one - coreSession.createQueue(address, subQueueName, selector, false, true); - } + if (openwireDestination.isTopic()) { + address = new SimpleString("jms.topic." + physicalName); - } - else { - coreSession.createQueue(address, subQueueName, selector, false, true); - } - } - else { - subQueueName = new SimpleString(UUID.randomUUID().toString()); + SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address); - coreSession.createQueue(address, subQueueName, selector, true, false); - } - - AMQServerConsumer serverConsumer = (AMQServerConsumer) coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1); + AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); - serverConsumer.setAmqConsumer(this); + return serverConsumer; } else { - SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); - AMQServerConsumer serverConsumer = (AMQServerConsumer)coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); - serverConsumer.setAmqConsumer(this); + SimpleString queueName = new SimpleString("jms.queue." + physicalName); + AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); + serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); if (addrSettings != null) { //see PolicyEntry - if (prefetchSize != 0 && addrSettings.getQueuePrefetch() == 0) { + if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) { //sends back a ConsumerControl ConsumerControl cc = new ConsumerControl(); cc.setConsumerId(info.getConsumerId()); @@ -150,9 +113,63 @@ public class AMQConsumer { session.getConnection().dispatch(cc); } } + + return serverConsumer; + + } + + } + + private SimpleString createTopicSubscription(boolean isDurable, + String clientID, + String physicalName, + String subscriptionName, + SimpleString selector, + SimpleString address) throws Exception { + + SimpleString queueName; + + if (isDurable) { + queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName)); + QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); + if (result.isExists()) { + // Already exists + if (result.getConsumerCount() > 0) { + throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); + } + + SimpleString oldFilterString = result.getFilterString(); + + boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector); + + SimpleString oldTopicName = result.getAddress(); + + boolean topicChanged = !oldTopicName.equals(address); + + if (selectorChanged || topicChanged) { + // Delete the old durable sub + session.getCoreSession().deleteQueue(queueName); + + // Create the new one + session.getCoreSession().createQueue(address, queueName, selector, false, true); + } + } + else { + session.getCoreSession().createQueue(address, queueName, selector, false, true); + } + } + else { + queueName = new SimpleString(UUID.randomUUID().toString()); + + session.getCoreSession().createQueue(address, queueName, selector, true, false); + } + + return queueName; } + + public long getNativeId() { return this.nativeId; } @@ -200,7 +217,7 @@ public class AMQConsumer { public void handleDeliverNullDispatch() { MessageDispatch md = new MessageDispatch(); md.setConsumerId(getId()); - md.setDestination(actualDest); + md.setDestination(openwireDestination); session.deliverMessage(md); windowAvailable.decrementAndGet(); } @@ -351,10 +368,6 @@ public class AMQConsumer { } } - public org.apache.activemq.command.ActiveMQDestination getDestination() { - return actualDest; - } - public ConsumerInfo getInfo() { return info; } @@ -375,8 +388,8 @@ public class AMQConsumer { session.removeConsumer(nativeId); } - public org.apache.activemq.command.ActiveMQDestination getActualDestination() { - return actualDest; + public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() { + return openwireDestination; } public void setPrefetchSize(int prefetchSize) { @@ -388,6 +401,9 @@ public class AMQConsumer { } } + /** + * The MessagePullHandler is used with slow consumer policies. + * */ private class MessagePullHandler { private long next = -1; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e43a561/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 86ea582..4675dca 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import javax.jms.ResourceAllocationException; import javax.transaction.xa.Xid; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -45,6 +44,8 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; @@ -62,6 +63,10 @@ import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { + + // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session + protected final IDGenerator idGenerator = new SimpleIDGenerator(0); + private ConnectionInfo connInfo; private AMQServerSession coreSession; private SessionInfo sessInfo; @@ -98,7 +103,7 @@ public class AMQSession implements SessionCallback { this.connection = connection; this.scheduledPool = scheduledPool; this.manager = manager; - OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller(); + OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller(); this.converter = new OpenWireMessageConverter(marshaller.copy()); } @@ -130,7 +135,9 @@ public class AMQSession implements SessionCallback { } - public void createConsumer(ConsumerInfo info, AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { + public List<AMQConsumer> createConsumer(ConsumerInfo info, + AMQSession amqSession, + SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); ActiveMQDestination[] dests = null; @@ -140,25 +147,32 @@ public class AMQSession implements SessionCallback { else { dests = new ActiveMQDestination[]{dest}; } - Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>(); - for (ActiveMQDestination d : dests) { - if (d.isQueue()) { - SimpleString queueName = OpenWireUtil.toCoreAddress(d); +// Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>(); + List<AMQConsumer> consumersList = new java.util.LinkedList<>(); + + for (ActiveMQDestination openWireDest : dests) { + if (openWireDest.isQueue()) { + SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest); getCoreServer().getJMSQueueCreator().create(queueName); } - AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool); + AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool); - consumer.init(slowConsumerDetectionListener); - consumerMap.put(d, consumer); + consumer.init(slowConsumerDetectionListener, idGenerator.generateID()); + consumersList.add(consumer); consumers.put(consumer.getNativeId(), consumer); } - connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap); - // TODO: This is wrong. We should only start when the client starts + return consumersList; + } + + public void start() { + coreSession.start(); started.set(true); + } + // rename actualDest to destination @Override public void afterDelivery() throws Exception { @@ -166,7 +180,7 @@ public class AMQSession implements SessionCallback { @Override public void browserFinished(ServerConsumer consumer) { - AMQConsumer theConsumer = ((AMQServerConsumer)consumer).getAmqConsumer(); + AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer(); if (theConsumer != null) { theConsumer.browseFinished(); } @@ -235,7 +249,6 @@ public class AMQSession implements SessionCallback { } - public void send(final ProducerInfo producerInfo, final Message messageSend, boolean sendProducerAck) throws Exception { @@ -286,7 +299,7 @@ public class AMQSession implements SessionCallback { else { final Connection transportConnection = connection.getTransportConnection(); -// new Exception("Setting to false").printStackTrace(); + // new Exception("Setting to false").printStackTrace(); if (transportConnection == null) { // I don't think this could happen, but just in case, avoiding races @@ -301,7 +314,6 @@ public class AMQSession implements SessionCallback { } } - internalSend(actualDestinations, originalCoreMsg, runnable); } @@ -340,7 +352,6 @@ public class AMQSession implements SessionCallback { } } - for (int i = 0; i < actualDestinations.length; i++) { ServerMessage coreMsg = originalCoreMsg.copy();
