http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java index 0980ec1..98e84e2 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java @@ -29,7 +29,7 @@ import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum; * NamedLiveNodeLocatorForReplication looks for a live server in the cluster with a specific backupGroupName * * @author <a href="mailto:[email protected]">Andy Taylor</a> - * @see org.hornetq.core.server.cluster.ha.HAPolicy#getBackupGroupName() + * @see org.hornetq.core.server.cluster.ha.HAPolicy#getGroupName() */ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java index ced98fe..8fba846 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java @@ -122,7 +122,7 @@ public class PostOfficeJournalLoader implements JournalLoader if (isTopicIdentification) { - long tx = storageManager.generateUniqueID(); + long tx = storageManager.generateID(); storageManager.deleteQueueBinding(tx, queueBindingInfo.getId()); storageManager.commitBindings(tx); continue; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java index 030c596..f4e8fe9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java @@ -14,6 +14,7 @@ package org.hornetq.core.server.impl; import java.io.PrintWriter; import java.io.StringWriter; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -33,10 +34,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.hornetq.api.core.Message; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; +import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.core.filter.Filter; import org.hornetq.core.journal.IOAsyncTask; import org.hornetq.core.message.impl.MessageImpl; @@ -47,8 +51,11 @@ import org.hornetq.core.postoffice.Binding; import org.hornetq.core.postoffice.Bindings; import org.hornetq.core.postoffice.DuplicateIDCache; import org.hornetq.core.postoffice.PostOffice; +import org.hornetq.core.postoffice.impl.PostOfficeImpl; +import org.hornetq.core.remoting.server.RemotingService; import org.hornetq.core.server.Consumer; import org.hornetq.core.server.HandleStatus; +import org.hornetq.core.server.HornetQMessageBundle; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.MessageReference; @@ -58,14 +65,17 @@ import org.hornetq.core.server.ScheduledDeliveryHandler; import org.hornetq.core.server.ServerMessage; import org.hornetq.core.server.cluster.RemoteQueueBinding; import org.hornetq.core.server.cluster.impl.Redistributor; +import org.hornetq.core.server.management.ManagementService; +import org.hornetq.core.server.management.Notification; import org.hornetq.core.settings.HierarchicalRepository; import org.hornetq.core.settings.HierarchicalRepositoryChangeListener; import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.core.settings.impl.SlowConsumerPolicy; import org.hornetq.core.transaction.Transaction; -import org.hornetq.core.transaction.TransactionOperationAbstract; import org.hornetq.core.transaction.TransactionPropertyIndexes; import org.hornetq.core.transaction.impl.BindingsTransactionImpl; import org.hornetq.core.transaction.impl.TransactionImpl; +import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.utils.ConcurrentHashSet; import org.hornetq.utils.FutureLatch; import org.hornetq.utils.LinkedListIterator; @@ -73,6 +83,7 @@ import org.hornetq.utils.PriorityLinkedList; import org.hornetq.utils.PriorityLinkedListImpl; import org.hornetq.utils.ReferenceCounter; import org.hornetq.utils.ReusableLatch; +import org.hornetq.utils.TypedProperties; /** * Implementation of a Queue @@ -149,6 +160,8 @@ public class QueueImpl implements Queue private long messagesAdded; + private long messagesAcknowledged; + protected final AtomicInteger deliveringCount = new AtomicInteger(0); private boolean paused; @@ -204,6 +217,14 @@ public class QueueImpl implements Queue private final ReusableLatch deliveriesInTransit = new ReusableLatch(0); + private AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); + + private AtomicLong messagesAddedSnapshot = new AtomicLong(0); + + private ScheduledFuture slowConsumerReaperFuture; + + private SlowConsumerReaperRunnable slowConsumerReaperRunnable; + /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -982,21 +1003,6 @@ public class QueueImpl implements Queue public long getMessageCount() { - return getMessageCount(FLUSH_TIMEOUT); - } - - public long getMessageCount(final long timeout) - { - if (timeout > 0) - { - internalFlushExecutor(timeout); - } - return getInstantMessageCount(); - } - - - public long getInstantMessageCount() - { synchronized (this) { if (pageSubscription != null) @@ -1068,6 +1074,8 @@ public class QueueImpl implements Queue postAcknowledge(ref); } + messagesAcknowledged++; + } public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception @@ -1093,6 +1101,8 @@ public class QueueImpl implements Queue getRefsOperation(tx).addAck(ref); } + + messagesAcknowledged++; } public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception @@ -1108,6 +1118,8 @@ public class QueueImpl implements Queue // https://issues.jboss.org/browse/HORNETQ-609 incDelivering(); + + messagesAcknowledged++; } private RefsOperation getRefsOperation(final Transaction tx) @@ -1123,7 +1135,7 @@ public class QueueImpl implements Queue if (oper == null) { - oper = new RefsOperation(); + oper = tx.createRefsOperation(this); tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper); @@ -1209,17 +1221,6 @@ public class QueueImpl implements Queue public long getMessagesAdded() { - return getMessagesAdded(FLUSH_TIMEOUT); - } - - public long getMessagesAdded(final long timeout) - { - if (timeout > 0) internalFlushExecutor(timeout); - return getInstantMessagesAdded(); - } - - public synchronized long getInstantMessagesAdded() - { if (pageSubscription != null) { return messagesAdded + pageSubscription.getCounter().getValue() - pagedReferences.get(); @@ -1230,6 +1231,10 @@ public class QueueImpl implements Queue } } + public long getMessagesAcknowledged() + { + return messagesAcknowledged; + } public int deleteAllReferences() throws Exception { @@ -1410,6 +1415,12 @@ public class QueueImpl implements Queue } } + if (!deleted) + { + // Look in scheduled deliveries + deleted = scheduledDeliveryHandler.removeReferenceWithID(messageID) != null ? true : false; + } + tx.commit(); return deleted; @@ -1456,6 +1467,11 @@ public class QueueImpl implements Queue tx.setContainsPersistent(); } + if (slowConsumerReaperFuture != null) + { + slowConsumerReaperFuture.cancel(false); + } + tx.commit(); } catch (Exception e) @@ -2545,7 +2561,7 @@ public class QueueImpl implements Queue and original message id */ - long newID = storageManager.generateUniqueID(); + long newID = storageManager.generateID(); ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, ref, expiry, copyOriginalHeaders); @@ -2578,7 +2594,7 @@ public class QueueImpl implements Queue } - private void sendToDeadLetterAddress(final MessageReference ref) throws Exception + public void sendToDeadLetterAddress(final MessageReference ref) throws Exception { sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); } @@ -2804,8 +2820,7 @@ public class QueueImpl implements Queue return consumerListClone; } - // Protected as testcases may change this behaviour - protected void postAcknowledge(final MessageReference ref) + public void postAcknowledge(final MessageReference ref) { QueueImpl queue = (QueueImpl) ref.getQueue(); @@ -2889,6 +2904,21 @@ public class QueueImpl implements Queue messagesAdded = 0; } + public synchronized void resetMessagesAcknowledged() + { + messagesAcknowledged = 0; + } + + public float getRate() + { + float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); + if (timeSlice == 0) + { + messagesAddedSnapshot.getAndSet(messagesAdded); + return 0.0f; + } + return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); + } // Inner classes // -------------------------------------------------------------------------- @@ -2906,189 +2936,6 @@ public class QueueImpl implements Queue } - public final class RefsOperation extends TransactionOperationAbstract - { - List<MessageReference> refsToAck = new ArrayList<MessageReference>(); - - List<ServerMessage> pagedMessagesToPostACK = null; - - /** - * It will ignore redelivery check, which is used during consumer.close - * to not perform reschedule redelivery check - */ - protected boolean ignoreRedeliveryCheck = false; - - - // once turned on, we shouldn't turn it off, that's why no parameters - public void setIgnoreRedeliveryCheck() - { - ignoreRedeliveryCheck = true; - } - - synchronized void addAck(final MessageReference ref) - { - refsToAck.add(ref); - if (ref.isPaged()) - { - if (pagedMessagesToPostACK == null) - { - pagedMessagesToPostACK = new ArrayList<ServerMessage>(); - } - pagedMessagesToPostACK.add(ref.getMessage()); - } - } - - @Override - public void afterRollback(final Transaction tx) - { - Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<QueueImpl, LinkedList<MessageReference>>(); - - long timeBase = System.currentTimeMillis(); - - //add any already acked refs, this means that they have been transferred via a producer.send() and have no - // previous state persisted. - List<MessageReference> ackedRefs = new ArrayList<>(); - - for (MessageReference ref : refsToAck) - { - ref.setConsumerId(null); - - if (HornetQServerLogger.LOGGER.isTraceEnabled()) - { - HornetQServerLogger.LOGGER.trace("rolling back " + ref); - } - try - { - if (ref.isAlreadyAcked()) - { - ackedRefs.add(ref); - } - // if ignore redelivery check, we just perform redelivery straight - if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) - { - LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue()); - - if (toCancel == null) - { - toCancel = new LinkedList<MessageReference>(); - - queueMap.put((QueueImpl) ref.getQueue(), toCancel); - } - - toCancel.addFirst(ref); - } - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorCheckingDLQ(e); - } - } - - for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry : queueMap.entrySet()) - { - LinkedList<MessageReference> refs = entry.getValue(); - - QueueImpl queue = entry.getKey(); - - synchronized (queue) - { - queue.postRollback(refs); - } - } - - if (!ackedRefs.isEmpty()) - { - //since pre acked refs have no previous state we need to actually create this by storing the message and the - //message references - try - { - Transaction ackedTX = new TransactionImpl(storageManager); - for (MessageReference ref : ackedRefs) - { - ServerMessage message = ref.getMessage(); - if (message.isDurable()) - { - int durableRefCount = message.incrementDurableRefCount(); - - if (durableRefCount == 1) - { - storageManager.storeMessageTransactional(ackedTX.getID(), message); - } - Queue queue = ref.getQueue(); - - storageManager.storeReferenceTransactional(ackedTX.getID(), queue.getID(), message.getMessageID()); - - ackedTX.setContainsPersistent(); - } - - message.incrementRefCount(); - } - ackedTX.commit(true); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - } - - @Override - public void afterCommit(final Transaction tx) - { - for (MessageReference ref : refsToAck) - { - synchronized (ref.getQueue()) - { - postAcknowledge(ref); - } - } - - if (pagedMessagesToPostACK != null) - { - for (ServerMessage msg : pagedMessagesToPostACK) - { - try - { - msg.decrementRefCount(); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.warn(e.getMessage(), e); - } - } - } - } - - @Override - public synchronized List<MessageReference> getRelatedMessageReferences() - { - List<MessageReference> listRet = new LinkedList<MessageReference>(); - listRet.addAll(listRet); - return listRet; - } - - @Override - public synchronized List<MessageReference> getListOnConsumer(long consumerID) - { - List<MessageReference> list = new LinkedList<MessageReference>(); - for (MessageReference ref : refsToAck) - { - if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID)) - { - list.add(ref); - } - } - - return list; - } - - public List<MessageReference> getReferencesToAcknowledge() - { - return refsToAck; - } - - } - private class DelayedAddRedistributor implements Runnable { private final Executor executor1; @@ -3309,16 +3156,66 @@ public class QueueImpl implements Queue return deliveringCount.incrementAndGet(); } - private void decDelivering() + public void decDelivering() { deliveringCount.decrementAndGet(); } - private void configureExpiry(final SimpleString expiryAddressArgument) + private void configureExpiry(final AddressSettings settings) { - this.expiryAddress = expiryAddressArgument; + this.expiryAddress = settings == null ? null : settings.getExpiryAddress(); } + private void configureSlowConsumerReaper(final AddressSettings settings) + { + if (settings == null || settings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) + { + if (slowConsumerReaperFuture != null) + { + slowConsumerReaperFuture.cancel(false); + slowConsumerReaperFuture = null; + slowConsumerReaperRunnable = null; + if (HornetQServerLogger.LOGGER.isDebugEnabled()) + { + HornetQServerLogger.LOGGER.debug("Cancelled slow-consumer-reaper thread for queue \"" + getName() + "\""); + } + } + } + else + { + if (slowConsumerReaperRunnable == null) + { + scheduleSlowConsumerReaper(settings); + } + else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || + slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() || + !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) + { + slowConsumerReaperFuture.cancel(false); + scheduleSlowConsumerReaper(settings); + } + } + } + + void scheduleSlowConsumerReaper(AddressSettings settings) + { + slowConsumerReaperRunnable = new SlowConsumerReaperRunnable(settings.getSlowConsumerCheckPeriod(), + settings.getSlowConsumerThreshold(), + settings.getSlowConsumerPolicy()); + + slowConsumerReaperFuture = scheduledExecutor.scheduleWithFixedDelay(slowConsumerReaperRunnable, + settings.getSlowConsumerCheckPeriod(), + settings.getSlowConsumerCheckPeriod(), + TimeUnit.SECONDS); + + if (HornetQServerLogger.LOGGER.isDebugEnabled()) + { + HornetQServerLogger.LOGGER.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy()); + } + } private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener { @@ -3326,13 +3223,103 @@ public class QueueImpl implements Queue public void onChange() { AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); - if (settings == null) + configureExpiry(settings); + configureSlowConsumerReaper(settings); + } + } + + private final class SlowConsumerReaperRunnable implements Runnable + { + private SlowConsumerPolicy policy; + private float threshold; + private long checkPeriod; + + public SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) + { + this.checkPeriod = checkPeriod; + this.policy = policy; + this.threshold = threshold; + } + + @Override + public void run() + { + float queueRate = getRate(); + if (HornetQServerLogger.LOGGER.isDebugEnabled()) { - configureExpiry(null); + HornetQServerLogger.LOGGER.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } - else + for (Consumer consumer : getConsumers()) { - configureExpiry(settings.getExpiryAddress()); + if (consumer instanceof ServerConsumerImpl) + { + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; + float consumerRate = serverConsumer.getRate(); + if (queueRate < threshold) + { + if (HornetQServerLogger.LOGGER.isDebugEnabled()) + { + HornetQServerLogger.LOGGER.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); + } + } + else if (consumerRate < threshold) + { + RemotingConnection connection = null; + RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService(); + + for (RemotingConnection potentialConnection : remotingService.getConnections()) + { + if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) + { + connection = potentialConnection; + } + } + + if (connection != null) + { + HornetQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); + if (policy.equals(SlowConsumerPolicy.KILL)) + { + remotingService.removeConnection(connection.getID()); + connection.fail(HornetQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); + } + else if (policy.equals(SlowConsumerPolicy.NOTIFY)) + { + TypedProperties props = new TypedProperties(); + + props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount()); + + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); + + if (connection != null) + { + props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress())); + + if (connection.getID() != null) + { + props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString())); + } + } + + props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, serverConsumer.getID()); + + props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(serverConsumer.getSessionID())); + + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_SLOW, props); + + ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService(); + try + { + managementService.sendNotification(notification); + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e); + } + } + } + } + } } } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java new file mode 100644 index 0000000..8a7a87c --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java @@ -0,0 +1,218 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.impl; + +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.MessageReference; +import org.hornetq.core.server.Queue; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.transaction.Transaction; +import org.hornetq.core.transaction.TransactionOperationAbstract; +import org.hornetq.core.transaction.impl.TransactionImpl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class RefsOperation extends TransactionOperationAbstract +{ + private final StorageManager storageManager; + private Queue queue; + List<MessageReference> refsToAck = new ArrayList<MessageReference>(); + + List<ServerMessage> pagedMessagesToPostACK = null; + + /** + * It will ignore redelivery check, which is used during consumer.close + * to not perform reschedule redelivery check + */ + protected boolean ignoreRedeliveryCheck = false; + + public RefsOperation(Queue queue, StorageManager storageManager) + { + this.queue = queue; + this.storageManager = storageManager; + } + + // once turned on, we shouldn't turn it off, that's why no parameters + public void setIgnoreRedeliveryCheck() + { + ignoreRedeliveryCheck = true; + } + + synchronized void addAck(final MessageReference ref) + { + refsToAck.add(ref); + if (ref.isPaged()) + { + if (pagedMessagesToPostACK == null) + { + pagedMessagesToPostACK = new ArrayList<ServerMessage>(); + } + pagedMessagesToPostACK.add(ref.getMessage()); + } + } + + @Override + public void afterRollback(final Transaction tx) + { + Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<QueueImpl, LinkedList<MessageReference>>(); + + long timeBase = System.currentTimeMillis(); + + //add any already acked refs, this means that they have been transferred via a producer.send() and have no + // previous state persisted. + List<MessageReference> ackedRefs = new ArrayList<>(); + + for (MessageReference ref : refsToAck) + { + ref.setConsumerId(null); + + if (HornetQServerLogger.LOGGER.isTraceEnabled()) + { + HornetQServerLogger.LOGGER.trace("rolling back " + ref); + } + try + { + if (ref.isAlreadyAcked()) + { + ackedRefs.add(ref); + } + // if ignore redelivery check, we just perform redelivery straight + if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) + { + LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue()); + + if (toCancel == null) + { + toCancel = new LinkedList<MessageReference>(); + + queueMap.put((QueueImpl) ref.getQueue(), toCancel); + } + + toCancel.addFirst(ref); + } + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.errorCheckingDLQ(e); + } + } + + for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry : queueMap.entrySet()) + { + LinkedList<MessageReference> refs = entry.getValue(); + + QueueImpl queue = entry.getKey(); + + synchronized (queue) + { + queue.postRollback(refs); + } + } + + if (!ackedRefs.isEmpty()) + { + //since pre acked refs have no previous state we need to actually create this by storing the message and the + //message references + try + { + Transaction ackedTX = new TransactionImpl(storageManager); + for (MessageReference ref : ackedRefs) + { + ServerMessage message = ref.getMessage(); + if (message.isDurable()) + { + int durableRefCount = message.incrementDurableRefCount(); + + if (durableRefCount == 1) + { + storageManager.storeMessageTransactional(ackedTX.getID(), message); + } + Queue queue = ref.getQueue(); + + storageManager.storeReferenceTransactional(ackedTX.getID(), queue.getID(), message.getMessageID()); + + ackedTX.setContainsPersistent(); + } + + message.incrementRefCount(); + } + ackedTX.commit(true); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + + @Override + public void afterCommit(final Transaction tx) + { + for (MessageReference ref : refsToAck) + { + synchronized (ref.getQueue()) + { + queue.postAcknowledge(ref); + } + } + + if (pagedMessagesToPostACK != null) + { + for (ServerMessage msg : pagedMessagesToPostACK) + { + try + { + msg.decrementRefCount(); + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + } + + @Override + public synchronized List<MessageReference> getRelatedMessageReferences() + { + List<MessageReference> listRet = new LinkedList<MessageReference>(); + listRet.addAll(listRet); + return listRet; + } + + @Override + public synchronized List<MessageReference> getListOnConsumer(long consumerID) + { + List<MessageReference> list = new LinkedList<MessageReference>(); + for (MessageReference ref : refsToAck) + { + if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID)) + { + list.add(ref); + } + } + + return list; + } + + public List<MessageReference> getReferencesToAcknowledge() + { + return refsToAck; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java index f7625e6..7bcb040 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java @@ -30,7 +30,7 @@ import org.hornetq.spi.core.protocol.RemotingConnection; * Stops the backup in case of an error at the start of Replication. * <p> * Using an interceptor for the task to avoid a server reference inside of the 'basic' channel-0 - * handler at {@link ClientSessionFactoryImpl#Channel0Handler}. As {@link ClientSessionFactoryImpl} + * handler at {@link org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager.Channel0Handler}. As {@link org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager} * is also shipped in the HQ-client JAR (which does not include {@link HornetQServer}). */ final class ReplicationError implements Interceptor http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java index dd166f9..c7ce890 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java @@ -80,6 +80,7 @@ public class ScaleDownHandler ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory); clusterControl.authorize(); long num = scaleDownMessages(sessionFactory, targetNodeId); + HornetQServerLogger.LOGGER.info("Scaled down " + num + " messages total."); scaleDownTransactions(sessionFactory, resourceManager); scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress); clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId()); @@ -107,11 +108,8 @@ public class ScaleDownHandler boolean storeAndForward = false; if (address.toString().startsWith("sf.")) { - if (address.toString().endsWith(targetNodeId)) - { - // send messages in this queue to the original address - storeAndForward = true; - } + // these get special treatment later + storeAndForward = true; } // this means we haven't inspected this address before @@ -159,10 +157,72 @@ public class ScaleDownHandler // loop through every message of this queue while (bigLoopMessageIterator.hasNext()) { + MessageReference bigLoopRef = bigLoopMessageIterator.next(); + Message message = bigLoopRef.getMessage().copy(); + if (storeAndForward) { - MessageReference bigLoopRef = bigLoopMessageIterator.next(); - Message message = bigLoopRef.getMessage(); + if (address.toString().endsWith(targetNodeId)) + { + /* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding + * address on the scale-down target server. However, we have to take the existing _HQ_ROUTE_TOsf.* + * property and put its value into the _HQ_ROUTE_TO property so the message is routed properly. + */ + + byte[] oldRouteToIDs = null; + + List<SimpleString> propertiesToRemove = new ArrayList<>(); + message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS); + for (SimpleString propName : message.getPropertyNames()) + { + if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) + { + if (propName.toString().endsWith(targetNodeId)) + { + oldRouteToIDs = message.getBytesProperty(propName); + } + propertiesToRemove.add(propName); + } + } + + for (SimpleString propertyToRemove : propertiesToRemove) + { + message.removeProperty(propertyToRemove); + } + + message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs); + } + else + { + /* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding + * store-and-forward address on the scale-down target server. In this case we use a special property + * for the queue ID so that the scale-down target server can route it appropriately. + */ + byte[] oldRouteToIDs = null; + + List<SimpleString> propertiesToRemove = new ArrayList<>(); + message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS); + for (SimpleString propName : message.getPropertyNames()) + { + if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) + { + if (propName.toString().endsWith(address.toString().substring(address.toString().lastIndexOf(".")))) + { + oldRouteToIDs = message.getBytesProperty(propName); + } + propertiesToRemove.add(propName); + } + } + + for (SimpleString propertyToRemove : propertiesToRemove) + { + message.removeProperty(propertyToRemove); + } + + message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs); + } + + HornetQServerLogger.LOGGER.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId); producer.send(message.getAddress(), message); messageCount++; bigLoopQueue.deleteReference(message.getMessageID()); @@ -171,8 +231,7 @@ public class ScaleDownHandler { List<Queue> queuesWithMessage = new ArrayList<>(); queuesWithMessage.add(bigLoopQueue); - MessageReference bigLoopRef = bigLoopMessageIterator.next(); - long messageId = bigLoopRef.getMessage().getMessageID(); + long messageId = message.getMessageID(); getQueuesWithMessage(store, queues, queueIterators, checkedQueues, bigLoopQueue, queuesWithMessage, bigLoopRef, messageId); @@ -200,9 +259,8 @@ public class ScaleDownHandler } logMessage.delete(logMessage.length() - 2, logMessage.length()); // trim off the trailing comma and space - HornetQServerLogger.LOGGER.info(logMessage.append(" on address ").append(address)); + HornetQServerLogger.LOGGER.debug(logMessage.append(" on address ").append(address)); - Message message = bigLoopRef.getMessage(); message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); //we need this incase we are sending back to the source server of the message, this basically //acts like the bridge and ignores dup detection @@ -258,6 +316,7 @@ public class ScaleDownHandler Map<String, Long> queueIDs = new HashMap<>(); for (Xid xid : preparedTransactions) { + HornetQServerLogger.LOGGER.debug("Scaling down transaction: " + xid); Transaction transaction = resourceManager.getTransaction(xid); session.start(xid, XAResource.TMNOFLAGS); List<TransactionOperation> allOperations = transaction.getAllOperations(); @@ -293,9 +352,9 @@ public class ScaleDownHandler queueIds.getA().add(queueID); } } - else if (operation instanceof QueueImpl.RefsOperation) + else if (operation instanceof RefsOperation) { - QueueImpl.RefsOperation refsOperation = (QueueImpl.RefsOperation) operation; + RefsOperation refsOperation = (RefsOperation) operation; List<MessageReference> refs = refsOperation.getReferencesToAcknowledge(); for (MessageReference ref : refs) { @@ -456,6 +515,7 @@ public class ScaleDownHandler if (queueID == -1) { session.createQueue(addressName, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable()); + HornetQServerLogger.LOGGER.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]"); queueID = getQueueID(session, queue.getName()); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java index 2bc2e26..2d1f8a3 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java @@ -12,12 +12,14 @@ */ package org.hornetq.core.server.impl; +import java.math.BigDecimal; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -26,8 +28,8 @@ import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQIllegalStateException; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.client.impl.ClientConsumerImpl; import org.hornetq.core.filter.Filter; import org.hornetq.core.message.BodyEncoder; @@ -73,7 +75,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener private final long id; - private final Queue messageQueue; + protected final Queue messageQueue; private final Filter filter; @@ -85,6 +87,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener private final boolean supportLargeMessage; + private Object protocolContext; + /** * We get a readLock when a message is handled, and return the readLock when the message is finally delivered * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished @@ -109,13 +113,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener */ private final boolean browseOnly; - private BrowserDeliverer browserDeliverer; + protected BrowserDeliverer browserDeliverer; private final boolean strictUpdateDeliveryCount; private final StorageManager storageManager; - private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>(); + protected final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>(); private final SessionCallback callback; @@ -135,6 +139,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener private final long creationTime; + private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis()); + + private AtomicLong messageConsumedSnapshot = new AtomicLong(0); + + private long acks; + // Constructors --------------------------------------------------------------------------------- public ServerConsumerImpl(final long id, @@ -223,6 +233,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener // ServerConsumer implementation // ---------------------------------------------------------------------- + public Object getProtocolContext() + { + return protocolContext; + } + + public void setProtocolContext(Object protocolContext) + { + this.protocolContext = protocolContext; + } + public long getID() { return id; @@ -266,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener public HandleStatus handle(final MessageReference ref) throws Exception { - if (availableCredits != null && availableCredits.get() <= 0) + if (callback != null && !callback.hasCredits(this) || availableCredits != null && availableCredits.get() <= 0) { if (HornetQServerLogger.LOGGER.isDebugEnabled()) { @@ -408,6 +428,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener return filter; } + @Override public void close(final boolean failed) throws Exception { callback.removeReadyListener(this); @@ -421,16 +442,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener del.finish(); } - if (browseOnly) - { - browserDeliverer.close(); - } - else - { - messageQueue.removeConsumer(this); - } - - session.removeConsumer(id); + removeItself(); LinkedList<MessageReference> refs = cancelRefs(failed, false, null); @@ -472,12 +484,27 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(session.getName())); - Notification notification = new Notification(null, NotificationType.CONSUMER_CLOSED, props); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props); managementService.sendNotification(notification); } } + @Override + public void removeItself() throws Exception + { + if (browseOnly) + { + browserDeliverer.close(); + } + else + { + messageQueue.removeConsumer(this); + } + + session.removeConsumer(id); + } + /** * Prompt delivery and send a "forced delivery" message to the consumer. * <p/> @@ -513,12 +540,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener } else { - ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50); + ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50); forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); - callback.sendMessage(forcedDeliveryMessage, id, 0); + callback.sendMessage(forcedDeliveryMessage, ServerConsumerImpl.this, 0); } } } @@ -565,7 +592,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener } if (performACK) { - acknowledge(false, tx, ref.getMessage().getMessageID()); + acknowledge(tx, ref.getMessage().getMessageID()); performACK = false; } @@ -655,7 +682,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener } } - public void receiveCredits(final int credits) throws Exception + public void receiveCredits(final int credits) { if (credits == -1) { @@ -705,7 +732,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener return messageQueue; } - public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long messageID) throws Exception + public void acknowledge(Transaction tx, final long messageID) throws Exception { if (browseOnly) { @@ -720,7 +747,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener boolean startedTransaction = false; - if (tx == null || autoCommitAcks) + if (tx == null) { startedTransaction = true; tx = new TransactionImpl(storageManager); @@ -745,6 +772,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener } ref.getQueue().acknowledge(tx, ref); + acks++; } while (ref.getMessage().getMessageID() != messageID); @@ -781,7 +809,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener } } - public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception + public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception { if (browseOnly) { @@ -795,7 +823,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener throw new IllegalStateException("Cannot find ref to ack " + messageID); } - if (autoCommitAcks) + if (tx == null) { ref.getQueue().acknowledge(ref); } @@ -803,6 +831,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.getQueue().acknowledge(tx, ref); } + acks++; } public void individualCancel(final long messageID, boolean failed) throws Exception @@ -896,12 +925,23 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener @Override public void disconnect() { - callback.disconnect(id, getQueue().getName().toString()); + callback.disconnect(this, getQueue().getName().toString()); + } + + public float getRate() + { + float timeSlice = ((System.currentTimeMillis() - consumerRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); + if (timeSlice == 0) + { + messageConsumedSnapshot.getAndSet(acks); + return 0.0f; + } + return BigDecimal.valueOf((acks - messageConsumedSnapshot.getAndSet(acks)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); } // Private -------------------------------------------------------------------------------------- - private void promptDelivery() + public void promptDelivery() { // largeMessageDeliverer is always set inside a lock // if we don't acquire a lock, we will have NPE eventually @@ -938,7 +978,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener */ private void deliverStandardMessage(final MessageReference ref, final ServerMessage message) { - int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount()); + int packetSize = callback.sendMessage(message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { @@ -1040,7 +1080,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener sentInitialPacket = true; int packetSize = callback.sendLargeMessage(largeMessage, - id, + ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); @@ -1088,7 +1128,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener byte[] body = bodyBuffer.toByteBuffer().array(); - int packetSize = callback.sendLargeMessageContinuation(id, + int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); @@ -1166,16 +1206,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener } } - private class BrowserDeliverer implements Runnable + protected class BrowserDeliverer implements Runnable { - private MessageReference current = null; + protected MessageReference current = null; public BrowserDeliverer(final LinkedListIterator<MessageReference> iterator) { this.iterator = iterator; } - private final LinkedListIterator<MessageReference> iterator; + public final LinkedListIterator<MessageReference> iterator; public synchronized void close() { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java index 54899ad..e780957 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java @@ -102,9 +102,10 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage return true; } - public void setMessageID(final long id) + public ServerMessageImpl setMessageID(final long id) { messageID = id; + return this; } public MessageReference createReference(final Queue queue) @@ -247,30 +248,27 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage @Override public void setOriginalHeaders(final ServerMessage other, final MessageReference originalReference, final boolean expiry) { + SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE); + + if (originalQueue != null) + { + putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); + } + else if (originalReference != null) + { + putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName()); + } + if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS)); - SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE); - - if (originalQueue != null) - { - putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); - } - putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID)); } else { putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress()); - /** - * This could be null in some DLA cases since the message wasn't routed yet - */ - if (originalReference != null) - { - putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName()); - } putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID()); } @@ -328,9 +326,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); } - // FIXME - this is stuff that is only used in large messages - - // This is only valid on the client side - why is it here? public InputStream getBodyInputStream() { return null; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java index fb9a637..1083dfb 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java @@ -26,6 +26,7 @@ package org.hornetq.core.server.impl; import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -42,6 +43,7 @@ import org.hornetq.api.core.HornetQNonExistentQueueException; import org.hornetq.api.core.Message; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.core.client.impl.ClientMessageImpl; import org.hornetq.core.exception.HornetQXAException; @@ -78,6 +80,7 @@ import org.hornetq.core.server.management.Notification; import org.hornetq.core.transaction.ResourceManager; import org.hornetq.core.transaction.Transaction; import org.hornetq.core.transaction.Transaction.State; +import org.hornetq.core.transaction.TransactionFactory; import org.hornetq.core.transaction.TransactionOperationAbstract; import org.hornetq.core.transaction.TransactionPropertyIndexes; import org.hornetq.core.transaction.impl.TransactionImpl; @@ -88,8 +91,6 @@ import org.hornetq.utils.UUID; import org.hornetq.utils.json.JSONArray; import org.hornetq.utils.json.JSONObject; -import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED; - /** * Server side Session implementation * @@ -108,29 +109,29 @@ public class ServerSessionImpl implements ServerSession, FailureListener // Attributes ---------------------------------------------------------------------------- - private final String username; + protected final String username; - private final String password; + protected final String password; private final int minLargeMessageSize; - private final boolean autoCommitSends; + protected boolean autoCommitSends; - private final boolean autoCommitAcks; + protected boolean autoCommitAcks; - private final boolean preAcknowledge; + protected final boolean preAcknowledge; - private final boolean strictUpdateDeliveryCount; + protected final boolean strictUpdateDeliveryCount; - private final RemotingConnection remotingConnection; + protected final RemotingConnection remotingConnection; - private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>(); + protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>(); - private Transaction tx; + protected Transaction tx; - private final boolean xa; + protected boolean xa; - private final StorageManager storageManager; + protected final StorageManager storageManager; private final ResourceManager resourceManager; @@ -138,24 +139,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener private final SecurityStore securityStore; - private final ManagementService managementService; + protected final ManagementService managementService; - private volatile boolean started = false; + protected volatile boolean started = false; - private final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>(); + protected final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>(); - private final String name; + protected final String name; - private final HornetQServer server; + protected final HornetQServer server; private final SimpleString managementAddress; // The current currentLargeMessage being processed private volatile LargeServerMessage currentLargeMessage; - private final RoutingContext routingContext = new RoutingContextImpl(null); + protected final RoutingContext routingContext = new RoutingContextImpl(null); - private final SessionCallback callback; + protected final SessionCallback callback; private volatile SimpleString defaultAddress; @@ -166,7 +167,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener private final OperationContext context; // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here - private final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>(); + protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>(); private final long creationTime = System.currentTimeMillis(); @@ -178,8 +179,34 @@ public class ServerSessionImpl implements ServerSession, FailureListener // concurrently. private volatile boolean closed = false; + private final TransactionFactory transactionFactory; + // Constructors --------------------------------------------------------------------------------- + //create an 'empty' session. Only used by AMQServerSession + //in order to check username and password + protected ServerSessionImpl(String username, String password) + { + this.username = username; + this.password = password; + + this.transactionFactory = null; + this.strictUpdateDeliveryCount = false; + this.storageManager = null; + this.server = null; + this.securityStore = null; + this.resourceManager = null; + this.remotingConnection = null; + this.preAcknowledge = false; + this.postOffice = null; + this.name = null; + this.minLargeMessageSize = 0; + this.managementService = null; + this.managementAddress = null; + this.context = null; + this.callback = null; + } + public ServerSessionImpl(final String name, final String username, final String password, @@ -201,6 +228,36 @@ public class ServerSessionImpl implements ServerSession, FailureListener final SessionCallback callback, final OperationContext context) throws Exception { + this(name, username, password, minLargeMessageSize, + autoCommitSends, autoCommitAcks, preAcknowledge, + strictUpdateDeliveryCount, xa, remotingConnection, + storageManager, postOffice, resourceManager, securityStore, + managementService, server, managementAddress, defaultAddress, + callback, context, null); + } + + public ServerSessionImpl(final String name, + final String username, + final String password, + final int minLargeMessageSize, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final boolean strictUpdateDeliveryCount, + final boolean xa, + final RemotingConnection remotingConnection, + final StorageManager storageManager, + final PostOffice postOffice, + final ResourceManager resourceManager, + final SecurityStore securityStore, + final ManagementService managementService, + final HornetQServer server, + final SimpleString managementAddress, + final SimpleString defaultAddress, + final SessionCallback callback, + final OperationContext context, + TransactionFactory transactionFactory) throws Exception + { this.username = username; this.password = password; @@ -242,6 +299,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener remotingConnection.addFailureListener(this); this.context = context; + + if (transactionFactory == null) + { + this.transactionFactory = new DefaultTransactionFactory(); + } + else + { + this.transactionFactory = transactionFactory; + } + if (!xa) { tx = newTransaction(); @@ -289,15 +356,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener return Collections.unmodifiableSet(consumersClone); } - public void removeConsumer(final long consumerID) throws Exception + public boolean removeConsumer(final long consumerID) throws Exception { - if (consumers.remove(consumerID) == null) - { - throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove"); - } + return consumers.remove(consumerID) != null; } - private void doClose(final boolean failed) throws Exception + protected void doClose(final boolean failed) throws Exception { synchronized (this) { @@ -316,14 +380,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener HornetQServerLogger.LOGGER.warn(e.getMessage(), e); } } - - server.removeSession(name); - - remotingConnection.removeFailureListener(this); - - callback.closed(); - - closed = true; } //putting closing of consumers outside the sync block @@ -332,7 +388,22 @@ public class ServerSessionImpl implements ServerSession, FailureListener for (ServerConsumer consumer : consumersClone) { - consumer.close(failed); + try + { + consumer.close(failed); + } + catch (Throwable e) + { + HornetQServerLogger.LOGGER.warn(e.getMessage(), e); + try + { + consumer.removeItself(); + } + catch (Throwable e2) + { + HornetQServerLogger.LOGGER.warn(e2.getMessage(), e2); + } + } } consumers.clear(); @@ -348,22 +419,34 @@ public class ServerSessionImpl implements ServerSession, FailureListener HornetQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); } } + + + synchronized (this) + { + server.removeSession(name); + + remotingConnection.removeFailureListener(this); + + callback.closed(); + + closed = true; + } } - public void createConsumer(final long consumerID, - final SimpleString queueName, - final SimpleString filterString, - final boolean browseOnly) throws Exception + public ServerConsumer createConsumer(final long consumerID, + final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly) throws Exception { - this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null); + return this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null); } - public void createConsumer(final long consumerID, - final SimpleString queueName, - final SimpleString filterString, - final boolean browseOnly, - final boolean supportLargeMessage, - final Integer credits) throws Exception + public ServerConsumer createConsumer(final long consumerID, + final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly, + final boolean supportLargeMessage, + final Integer credits) throws Exception { Binding binding = postOffice.getBinding(queueName); @@ -376,7 +459,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener Filter filter = FilterImpl.createFilter(filterString); - ServerConsumer consumer = new ServerConsumerImpl(consumerID, + ServerConsumer consumer = newConsumer(consumerID, this, (QueueBinding) binding, filter, @@ -419,7 +502,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); } - Notification notification = new Notification(null, CONSUMER_CREATED, props); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); if (HornetQServerLogger.LOGGER.isDebugEnabled()) { @@ -431,6 +514,31 @@ public class ServerSessionImpl implements ServerSession, FailureListener managementService.sendNotification(notification); } + + return consumer; + } + + protected ServerConsumer newConsumer(long consumerID, + ServerSessionImpl serverSessionImpl, QueueBinding binding, + Filter filter, boolean started2, boolean browseOnly, + StorageManager storageManager2, SessionCallback callback2, + boolean preAcknowledge2, boolean strictUpdateDeliveryCount2, + ManagementService managementService2, boolean supportLargeMessage, + Integer credits) throws Exception + { + return new ServerConsumerImpl(consumerID, + this, + (QueueBinding) binding, + filter, + started, + browseOnly, + storageManager, + callback, + preAcknowledge, + strictUpdateDeliveryCount, + managementService, + supportLargeMessage, + credits); } public void createQueue(final SimpleString address, @@ -492,13 +600,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener return remotingConnection; } - private static class TempQueueCleanerUpper implements CloseListener, FailureListener + public static class TempQueueCleanerUpper implements CloseListener, FailureListener { private final SimpleString bindingName; private final HornetQServer server; - TempQueueCleanerUpper(final HornetQServer server, final SimpleString bindingName) + public TempQueueCleanerUpper(final HornetQServer server, final SimpleString bindingName) { this.server = server; @@ -599,7 +707,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener queue.isTemporary(), filterString, queue.getConsumerCount(), - queue.getMessageCount(QueueImpl.DELIVERY_TIMEOUT)); + queue.getMessageCount()); } // make an exception for the management address (see HORNETQ-29) else if (name.equals(managementAddress)) @@ -668,12 +776,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener // have these messages to be stuck on the limbo until the server is restarted // The tx has already timed out, so we need to ack and rollback immediately Transaction newTX = newTransaction(); - consumer.acknowledge(autoCommitAcks, newTX, messageID); + consumer.acknowledge(newTX, messageID); newTX.rollback(); } else { - consumer.acknowledge(autoCommitAcks, tx, messageID); + consumer.acknowledge(autoCommitAcks ? null : tx, messageID); } } @@ -681,23 +789,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ServerConsumer consumer = consumers.get(consumerID); - if (this.xa && tx == null) - { - throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state"); - } - if (tx != null && tx.getState() == State.ROLLEDBACK) { // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just // have these messages to be stuck on the limbo until the server is restarted // The tx has already timed out, so we need to ack and rollback immediately Transaction newTX = newTransaction(); - consumer.individualAcknowledge(autoCommitAcks, tx, messageID); + consumer.individualAcknowledge(tx, messageID); newTX.rollback(); } else { - consumer.individualAcknowledge(autoCommitAcks, tx, messageID); + consumer.individualAcknowledge(autoCommitAcks ? null : tx, messageID); } } @@ -732,11 +835,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener } try { - tx.commit(); + if (tx != null) + { + tx.commit(); + } } finally { - tx = newTransaction(); + if (xa) + { + tx = null; + } + else + { + tx = newTransaction(); + } } } @@ -774,18 +887,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener /** * @return */ - private TransactionImpl newTransaction() + protected Transaction newTransaction() { - return new TransactionImpl(storageManager, timeoutSeconds); + return transactionFactory.newTransaction(null, storageManager, timeoutSeconds); } /** * @param xid * @return */ - private TransactionImpl newTransaction(final Xid xid) + private Transaction newTransaction(final Xid xid) { - return new TransactionImpl(xid, storageManager, timeoutSeconds); + return transactionFactory.newTransaction(xid, storageManager, timeoutSeconds); } public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception @@ -1306,13 +1419,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener @Override public Transaction getCurrentTransaction() { + if (tx == null) + { + tx = newTransaction(); + } return tx; } public void sendLarge(final MessageInternal message) throws Exception { // need to create the LargeMessage before continue - long id = storageManager.generateUniqueID(); + long id = storageManager.generateID(); LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message); @@ -1335,7 +1452,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener //case the id header already generated. if (!message.isLargeMessage()) { - long id = storageManager.generateUniqueID(); + long id = storageManager.generateID(); message.setMessageID(id); message.encodeMessageIDToBuffer(); @@ -1658,6 +1775,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener toCancel.addAll(consumer.cancelRefs(clientFailed, lastMessageAsDelived, theTx)); } + //we need to check this before we cancel the refs and add them to the tx, any delivering refs will have been delivered + //after the last tx was rolled back so we should handle them separately. if not they + //will end up added to the tx but never ever handled even tho they were removed from the consumers delivering refs. + //we add them to a new tx and roll them back as the calling client will assume that this has happened. + if (theTx.getState() == State.ROLLEDBACK) + { + Transaction newTX = newTransaction(); + cancelAndRollback(clientFailed, newTX, wasStarted, toCancel); + throw new IllegalStateException("Transaction has already been rolled back"); + } + cancelAndRollback(clientFailed, theTx, wasStarted, toCancel); + } + + private void cancelAndRollback(boolean clientFailed, Transaction theTx, boolean wasStarted, List<MessageReference> toCancel) throws Exception + { for (MessageReference ref : toCancel) { ref.getQueue().cancel(theTx, ref); @@ -1683,7 +1815,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener theTx.rollback(); } - private void doSend(final ServerMessage msg, final boolean direct) throws Exception + protected void doSend(final ServerMessage msg, final boolean direct) throws Exception { // check the user has write access to this address. try @@ -1692,7 +1824,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener } catch (HornetQException e) { - if (!autoCommitSends) + if (!autoCommitSends && tx != null) { tx.markAsRollbackOnly(e); } @@ -1735,7 +1867,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (this.tx != null) { - QueueImpl.RefsOperation oper = (QueueImpl.RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION); if (oper == null) { @@ -1752,4 +1884,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener } } + private static class DefaultTransactionFactory implements TransactionFactory + { + @Override + public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) + { + return new TransactionImpl(xid, storageManager, timeoutSeconds); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java new file mode 100644 index 0000000..0e0e809 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java @@ -0,0 +1,128 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.server.impl; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import org.hornetq.api.core.Interceptor; +import org.hornetq.api.core.Pair; +import org.hornetq.core.config.ConnectorServiceConfiguration; +import org.hornetq.core.server.ConnectorServiceFactory; + +/** + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ + +public class ServiceRegistry +{ + private ExecutorService executorService; + + private ScheduledExecutorService scheduledExecutorService; + + /* We are using a List rather than HashMap here as HornetQ allows multiple instances of the same class to be added + * to the interceptor list + */ + private Map<String, Interceptor> incomingInterceptors; + + private Map<String, Interceptor> outgoingInterceptors; + + private Map<String, Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServices; + + public ServiceRegistry() + { + this.incomingInterceptors = new ConcurrentHashMap<String, Interceptor>(); + this.outgoingInterceptors = new ConcurrentHashMap<String, Interceptor>(); + this.connectorServices = new ConcurrentHashMap<String, Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>>(); + } + + public ExecutorService getExecutorService() + { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) + { + this.executorService = executorService; + } + + public ScheduledExecutorService getScheduledExecutorService() + { + return scheduledExecutorService; + } + + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) + { + this.scheduledExecutorService = scheduledExecutorService; + } + + public void addConnectorService(ConnectorServiceFactory connectorServiceFactory, ConnectorServiceConfiguration configuration) + { + connectorServices.put(configuration.getConnectorName(), new Pair<>(connectorServiceFactory, configuration)); + } + + public void removeConnectorService(ConnectorServiceConfiguration configuration) + { + connectorServices.remove(configuration.getConnectorName()); + } + + public Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices() + { + return connectorServices.values(); + } + + public void addIncomingInterceptor(String name, Interceptor interceptor) + { + incomingInterceptors.put(name, interceptor); + } + + public void removeIncomingInterceptor(String name) + { + incomingInterceptors.remove(name); + } + + public Collection<Interceptor> getIncomingInterceptors() + { + return Collections.unmodifiableCollection(incomingInterceptors.values()); + } + + public Interceptor getIncomingInterceptor(String name) + { + return incomingInterceptors.get(name); + } + + public void addOutgoingInterceptor(String name, Interceptor interceptor) + { + outgoingInterceptors.put(name, interceptor); + } + + public Interceptor getOutgoingInterceptor(String name) + { + return outgoingInterceptors.get(name); + } + + public void removeOutgoingInterceptor(String name) + { + outgoingInterceptors.remove(name); + } + + public Collection<Interceptor> getOutgoingInterceptors() + { + return Collections.unmodifiableCollection(outgoingInterceptors.values()); + } +}
