Author: rgodfrey
Date: Tue Jan 3 19:48:46 2012
New Revision: 1226930
URL: http://svn.apache.org/viewvc?rev=1226930&view=rev
Log:
QPID-3720 : [Java Broker] Implement Message Grouping
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
- copied, changed from r1226752,
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Jan 3 19:48:46 2012
@@ -149,7 +149,13 @@ public interface AMQQueue extends Managa
void removeMessagesFromQueue(long fromMessageId, long toMessageId);
-
+ static interface Visitor
+ {
+ boolean visit(QueueEntry entry);
+ }
+
+ void visit(Visitor visitor);
+
long getMaximumMessageSize();
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Tue Jan 3 19:48:46 2012
@@ -75,6 +75,11 @@ public interface QueueEntry extends Comp
{
return State.AVAILABLE;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -85,6 +90,11 @@ public interface QueueEntry extends Comp
{
return State.DEQUEUED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -95,6 +105,11 @@ public interface QueueEntry extends Comp
{
return State.DELETED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class ExpiredState extends EntryState
@@ -104,6 +119,11 @@ public interface QueueEntry extends Comp
{
return State.EXPIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -113,6 +133,11 @@ public interface QueueEntry extends Comp
{
return State.ACQUIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class SubscriptionAcquiredState extends EntryState
@@ -134,6 +159,11 @@ public interface QueueEntry extends Comp
{
return _subscription;
}
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
public final class SubscriptionAssignedState extends EntryState
@@ -155,6 +185,12 @@ public interface QueueEntry extends Comp
{
return _subscription;
}
+
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
Tue Jan 3 19:48:46 2012
@@ -35,4 +35,6 @@ public interface QueueEntryList<Q extend
Q getHead();
void entryDeleted(Q queueEntry);
+
+ int getPriorities();
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Tue Jan 3 19:48:46 2012
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQSecurityExcept
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
@@ -33,7 +32,6 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -45,6 +43,7 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.MessageGroupManager;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -68,11 +67,11 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
private static final Logger _logger =
Logger.getLogger(SimpleAMQQueue.class);
+ private static final String QPID_GROUP_HEADER_KEY =
"qpid.group_header_key";
private final VirtualHost _virtualHost;
@@ -189,6 +188,7 @@ public class SimpleAMQQueue implements A
/** the maximum delivery count for each message on this queue or 0 if
maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount =
ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+ private final MessageGroupManager _messageGroupManager;
protected SimpleAMQQueue(AMQShortString name, boolean durable,
AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost
virtualHost, Map<String,Object> arguments)
{
@@ -242,25 +242,15 @@ public class SimpleAMQQueue implements A
_logSubject = new QueueLogSubject(this);
_logActor = new QueueActor(this,
CurrentActor.get().getRootMessageLogger());
- // Log the correct creation message
-
- // Extract the number of priorities for this Queue.
- // Leave it as 0 if we are a SimpleQueueEntryList
- int priorities = 0;
- if (entryListFactory instanceof PriorityQueueList.Factory)
- {
- priorities = ((PriorityQueueList)_entries).getPriorities();
- }
-
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
CurrentActor.get().message(_logSubject,
QueueMessages.CREATED(String.valueOf(_owner),
- priorities,
- _owner != null,
- autoDelete,
- durable, !durable,
- priorities > 0));
+
_entries.getPriorities(),
+ _owner != null,
+ autoDelete,
+ durable, !durable,
+
_entries.getPriorities() > 0));
getConfigStore().addConfiguredObject(this);
@@ -274,6 +264,15 @@ public class SimpleAMQQueue implements A
_logger.error("AMQQueue MBean creation has failed ", e);
}
+ if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ {
+ _messageGroupManager = new
MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+ }
+ else
+ {
+ _messageGroupManager = null;
+ }
+
resetNotifications();
}
@@ -488,6 +487,32 @@ public class SimpleAMQQueue implements A
setExclusiveSubscriber(null);
subscription.setQueueContext(null);
+ if(_messageGroupManager != null)
+ {
+ QueueEntry entry =
_messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ _messageGroupManager.clearAssignments(subscription);
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter =
_subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in
advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub =
subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+
+ }
+
// auto-delete queues must be deleted if there are no remaining
subscribers
if (_autoDelete && getDeleteOnNoConsumers() &&
!subscription.isTransient() && getConsumerCount() == 0 )
@@ -691,21 +716,20 @@ public class SimpleAMQQueue implements A
{
try
{
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
+ && mightAssign(sub, entry)
+ && !sub.wouldSuspend(entry))
{
- if (!sub.wouldSuspend(entry))
+ if (sub.acquires() && !(assign(sub, entry) &&
entry.acquire(sub)))
{
- if (sub.acquires() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken
away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
- {
- deliverMessage(sub, entry, false);
- }
+ // restore credit here that would have been taken away
by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
}
}
}
@@ -716,6 +740,20 @@ public class SimpleAMQQueue implements A
}
}
+ private boolean assign(final Subscription sub, final QueueEntry entry)
+ {
+ return _messageGroupManager == null ||
_messageGroupManager.acceptMessage(sub, entry);
+ }
+
+
+ private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ {
+ if(_messageGroupManager == null || !sub.acquires())
+ return true;
+ Subscription assigned =
_messageGroupManager.getAssignedSubscription(entry);
+ return (assigned == null) || (assigned == sub);
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -1020,6 +1058,8 @@ public class SimpleAMQQueue implements A
public boolean filterComplete();
}
+
+
public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId,
final long toMessageId)
{
return getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1074,6 +1114,24 @@ public class SimpleAMQQueue implements A
}
+ public void visit(final Visitor visitor)
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ if(!node.isDispensed())
+ {
+ if(visitor.visit(node))
+ {
+ break;
+ }
+ }
+ }
+ }
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg
messages 5 to 10 on the queue.
*
@@ -1708,11 +1766,11 @@ public class SimpleAMQQueue implements A
if (node != null && node.isAvailable())
{
- if (sub.hasInterest(node))
+ if (sub.hasInterest(node) && mightAssign(sub, node))
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !node.acquire(sub))
+ if (sub.acquires() && !(assign(sub, node) &&
node.acquire(sub)))
{
// restore credit here that would have been taken
away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -1769,7 +1827,8 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null &&
lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired =
node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired =
node.expired()) || !sub.hasInterest(node) ||
+ !mightAssign(sub,node)))
{
if (expired)
{
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
Tue Jan 3 19:48:46 2012
@@ -185,6 +185,11 @@ public class SimpleQueueEntryList implem
advanceHead();
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
static class Factory implements QueueEntryListFactory
{
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
Tue Jan 3 19:48:46 2012
@@ -51,13 +51,11 @@ public class SortedQueueEntryList implem
_propertyName = propertyName;
}
- @Override
public AMQQueue getQueue()
{
return _queue;
}
- @Override
public SortedQueueEntryImpl add(final ServerMessage message)
{
synchronized(_lock)
@@ -286,7 +284,6 @@ public class SortedQueueEntryList implem
return (node == null ? Colour.BLACK : node.getColour()) == colour;
}
- @Override
public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
{
synchronized(_lock)
@@ -316,13 +313,11 @@ public class SortedQueueEntryList implem
}
}
- @Override
public QueueEntryIterator<SortedQueueEntryImpl> iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- @Override
public SortedQueueEntryImpl getHead()
{
return _head;
@@ -333,7 +328,6 @@ public class SortedQueueEntryList implem
return _root;
}
- @Override
public void entryDeleted(final SortedQueueEntryImpl entry)
{
synchronized(_lock)
@@ -431,6 +425,11 @@ public class SortedQueueEntryList implem
}
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
/**
* Swaps the position of the node in the tree with it's successor
* (that is the node with the next highest key)
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java?rev=1226930&view=auto
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
(added)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
Tue Jan 3 19:48:46 2012
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class MessageGroupManager
+{
+ private static final Logger _logger =
LoggerFactory.getLogger(MessageGroupManager.class);
+
+
+ private final String _groupId;
+ private final ConcurrentHashMap<Integer, Subscription> _groupMap = new
ConcurrentHashMap<Integer, Subscription>();
+ private final int _groupMask;
+
+ public MessageGroupManager(final String groupId, final int maxGroups)
+ {
+ _groupId = groupId;
+ _groupMask = pow2(maxGroups)-1;
+ }
+
+ private static int pow2(final int i)
+ {
+ int val = 1;
+ while(val < i) val<<=1;
+ return val;
+ }
+
+ public Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupVal =
entry.getMessage().getMessageHeader().getHeader(_groupId);
+ return groupVal == null ? null : _groupMap.get(groupVal.hashCode() &
_groupMask);
+ }
+
+ public boolean acceptMessage(Subscription sub, QueueEntry entry)
+ {
+ Object groupVal =
entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupVal == null)
+ {
+ return true;
+ }
+ else
+ {
+ Integer group = groupVal.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == sub)
+ {
+ return true;
+ }
+ else
+ {
+ if(assignedSub == null)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Assigning group " + groupVal + " to sub
" + sub);
+ }
+ assignedSub = _groupMap.putIfAbsent(group, sub);
+ return assignedSub == null || assignedSub == sub;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId =
entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupId == null)
+ return false;
+
+ Integer group = groupId.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+ public void clearAssignments(Subscription sub)
+ {
+ Iterator<Subscription> subIter = _groupMap.values().iterator();
+ while(subIter.hasNext())
+ {
+ if(subIter.next() == sub)
+ {
+ subIter.remove();
+ }
+ }
+ }
+}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Tue Jan 3 19:48:46 2012
@@ -601,20 +601,20 @@ public class MockAMQQueue implements AMQ
}
- @Override
public int getMaximumDeliveryCount()
{
return 0;
}
- @Override
public void setMaximumDeliveryCount(int maximumDeliveryCount)
{
}
- @Override
public void setAlternateExchange(String exchangeName)
{
}
+ public void visit(final Visitor visitor)
+ {
+ }
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Jan 3 19:48:46 2012
@@ -1583,6 +1583,11 @@ public abstract class AMQSession<C exten
return _prefetchLowMark;
}
+ public int getPrefetch()
+ {
+ return _prefetchHighMark;
+ }
+
public AMQShortString getDefaultQueueExchangeName()
{
return _connection.getDefaultQueueExchangeName();
@@ -3047,7 +3052,7 @@ public abstract class AMQSession<C exten
*/
public boolean prefetch()
{
- return getAMQConnection().getMaxPrefetch() > 0;
+ return _prefetchHighMark > 0;
}
/** Signifies that the session has pending sends to commit. */
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Jan 3 19:48:46 2012
@@ -545,7 +545,7 @@ public class BasicMessageConsumer_0_10 e
}
else if (getSession().prefetch())
{
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ capacity = getSession().getPrefetch();
}
return capacity;
}
Modified:
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
(original)
+++
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
Tue Jan 3 19:48:46 2012
@@ -588,7 +588,7 @@ public class AMQSession_0_10Test extends
}
boolean isTransacted = acknowledgeMode ==
javax.jms.Session.SESSION_TRANSACTED ? true : false;
AMQSession_0_10 session = new
AMQSession_0_10(createConnection(throwException), amqConnection, 1,
isTransacted, acknowledgeMode,
- 1, 1, "test");
+ 0, 0, "test");
return session;
}
@@ -600,7 +600,6 @@ public class AMQSession_0_10Test extends
connection.setSessionFactory(new SessionFactory()
{
- @Override
public Session newSession(Connection conn, Binary name, long
expiry)
{
return new MockSession(conn, new SessionDelegate(), name,
expiry, throwException);
@@ -611,7 +610,6 @@ public class AMQSession_0_10Test extends
private final class MockMessageListener implements MessageListener
{
- @Override
public void onMessage(Message arg0)
{
}
@@ -710,23 +708,19 @@ public class AMQSession_0_10Test extends
{
private List<ProtocolEvent> _sendEvents = new
ArrayList<ProtocolEvent>();
- @Override
public void setIdleTimeout(int i)
{
}
- @Override
public void send(ProtocolEvent msg)
{
_sendEvents.add(msg);
}
- @Override
public void flush()
{
}
- @Override
public void close()
{
}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
Tue Jan 3 19:48:46 2012
@@ -166,6 +166,11 @@ public abstract class AMQTypedValue
private static final class IntTypedValue extends AMQTypedValue
{
+ @Override
+ public String toString()
+ {
+ return "[INT: " + String.valueOf(_value) + "]";
+ }
private final int _value;
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Tue Jan 3 19:48:46 2012
@@ -44,6 +44,7 @@ import static org.apache.qpid.util.Seria
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -828,8 +829,17 @@ public class Session extends SessionInvo
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- checkFailoverRequired("Session sync was interrupted by
failover.");
- log.debug("%s waiting for[%d]: %d, %s", this, point,
maxComplete, Arrays.asList(commands));
+ checkFailoverRequired("Session sync was interrupted by
failover.");
+ if(log.isDebugEnabled())
+ {
+ List<Method> waitingFor =
+ Arrays.asList(commands)
+ .subList(mod(maxComplete,commands.length),
+ mod(commandsOut-1, commands.length)
< mod(maxComplete, commands.length)
+ ? commands.length-1
+ : mod(commandsOut-1,
commands.length));
+ log.debug("%s waiting for[%d]: %d, %s", this, point,
maxComplete, waitingFor);
+ }
w.await();
}
Copied:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
(from r1226752,
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?p2=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java&r1=1226752&r2=1226930&rev=1226930&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
Tue Jan 3 19:48:46 2012
@@ -20,8 +20,13 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -30,17 +35,14 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.HashMap;
+import java.util.Map;
-public class PriorityQueueTest extends QpidBrokerTestCase
+public class MessageGroupQueueTest extends QpidBrokerTestCase
{
private static final int TIMEOUT = 1500;
- protected final String QUEUE = "PriorityQueue";
+ protected final String QUEUE = "MessageGroupQueue";
private static final int MSG_COUNT = 50;
@@ -49,10 +51,8 @@ public class PriorityQueueTest extends Q
private Session producerSession;
private Queue queue;
private Connection consumerConnection;
- private Session consumerSession;
-
- private MessageConsumer consumer;
-
+
+
protected void setUp() throws Exception
{
super.setUp();
@@ -63,8 +63,7 @@ public class PriorityQueueTest extends Q
producerConnection.start();
consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
-
+
}
protected void tearDown() throws Exception
@@ -74,125 +73,275 @@ public class PriorityQueueTest extends Q
super.tearDown();
}
- public void testPriority() throws JMSException, NamingException,
AMQException
+ /**
+ * Pre populate the queue with messages with groups as follows
+ *
+ * ONE
+ * TWO
+ * ONE
+ * TWO
+ *
+ * Create two consumers with prefetch of 1, the first consumer should
then be assigned group ONE, the second
+ * consumer assigned group TWO if they are started in sequence.
+ *
+ * Thus doing
+ *
+ * c1 <--- (ONE)
+ * c2 <--- (TWO)
+ * c2 ack --->
+ *
+ * c2 should now be able to receive a second message from group TWO
(skipping over the message from group ONE)
+ *
+ * i.e.
+ *
+ * c2 <--- (TWO)
+ * c2 ack --->
+ * c1 <--- (ONE)
+ * c1 ack --->
+ *
+ */
+ public void testSimpleGroupAssignment() throws Exception
{
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-priorities",10);
+ arguments.put("qpid.group_header_key","group");
+ arguments.put("qpid.shared_msg_group","1");
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE),
true, false, false, arguments);
queue = (Queue)
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
- for (int msg = 0; msg < MSG_COUNT; msg++)
+ String[] groups = { "ONE", "TWO"};
+
+ for (int msg = 0; msg < 4; msg++)
{
- producer.setPriority(msg % 10);
- producer.send(nextMessage(msg, false, producerSession, producer));
+ producer.send(createMessage(msg, groups[msg % groups.length]));
}
producerSession.commit();
producer.close();
producerSession.close();
producerConnection.close();
- consumer = consumerSession.createConsumer(queue);
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false,
Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false,
Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
consumerConnection.start();
- Message received;
- int receivedCount = 0;
- Message previous = null;
- int messageCount = 0;
- while((received = consumer.receive(1000))!=null)
- {
- messageCount++;
- if(previous != null)
- {
- assertTrue("Messages arrived in unexpected order " +
messageCount + " " + previous.getIntProperty("msg") + " " +
received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " +
received.getJMSPriority(), (previous.getJMSPriority() >
received.getJMSPriority()) || ((previous.getJMSPriority() ==
received.getJMSPriority()) && previous.getIntProperty("msg") <
received.getIntProperty("msg")) );
- }
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message",
cs1Received);
- previous = received;
- receivedCount++;
- }
+ Message cs2Received = consumer2.receive(1000);
- assertEquals("Incorrect number of message received", 50,
receivedCount);
+ assertNotNull("Consumer 2 should have received first message",
cs2Received);
+
+ cs1Received.acknowledge();
+ cs2Received.acknowledge();
+
+ Message cs2Received2 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message",
cs2Received2);
+ assertEquals("Differing groups",
cs2Received2.getStringProperty("group"),
+ cs2Received.getStringProperty("group"));
+
+ Message cs1Received2 = consumer1.receive(1000);
+
+ assertNotNull("Consumer 1 should have received second message",
cs1Received2);
+ assertEquals("Differing groups",
cs1Received2.getStringProperty("group"),
+ cs1Received.getStringProperty("group"));
+
+ cs1Received2.acknowledge();
+ cs2Received2.acknowledge();
+
+ assertNull(consumer1.receive(1000));
+ assertNull(consumer2.receive(1000));
}
- public void testOddOrdering() throws AMQException, JMSException
+ /**
+ *
+ * Tests that upon closing a consumer, groups previously assigned to that
consumer are reassigned to a different
+ * consumer.
+ *
+ * Pre-populate the queue as ONE, ONE, TWO, ONE
+ *
+ * create in sequence two consumers
+ *
+ * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+ *
+ * Then close c1 before acking.
+ *
+ * If we now attempt to receive from c2, then the remaining messages in
group ONE should be available (which
+ * requires c2 to go "backwards" in the queue).
+ *
+ * */
+ public void testConsumerCloseGroupAssignment() throws Exception
{
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-priorities",3);
+ arguments.put("qpid.group_header_key","group");
+ arguments.put("qpid.shared_msg_group","1");
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE),
true, false, false, arguments);
- queue =
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+ queue = (Queue)
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
- // In order ABC
- producer.setPriority(9);
- producer.send(nextMessage(1, false, producerSession, producer));
- producer.setPriority(4);
- producer.send(nextMessage(2, false, producerSession, producer));
- producer.setPriority(1);
- producer.send(nextMessage(3, false, producerSession, producer));
-
- // Out of order BAC
- producer.setPriority(4);
- producer.send(nextMessage(4, false, producerSession, producer));
- producer.setPriority(9);
- producer.send(nextMessage(5, false, producerSession, producer));
- producer.setPriority(1);
- producer.send(nextMessage(6, false, producerSession, producer));
-
- // Out of order BCA
- producer.setPriority(4);
- producer.send(nextMessage(7, false, producerSession, producer));
- producer.setPriority(1);
- producer.send(nextMessage(8, false, producerSession, producer));
- producer.setPriority(9);
- producer.send(nextMessage(9, false, producerSession, producer));
-
- // Reverse order CBA
- producer.setPriority(1);
- producer.send(nextMessage(10, false, producerSession, producer));
- producer.setPriority(4);
- producer.send(nextMessage(11, false, producerSession, producer));
- producer.setPriority(9);
- producer.send(nextMessage(12, false, producerSession, producer));
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "ONE"));
+ producer.send(createMessage(3, "TWO"));
+ producer.send(createMessage(4, "ONE"));
+
producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false,
Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false,
Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
- consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message",
cs1Received);
+
+ Message cs2Received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message",
cs2Received);
+ cs2Received.acknowledge();
+
+ Message cs2Received2 = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received second message",
cs2Received2);
+
+ consumer1.close();
+
+ cs1Received.acknowledge();
+ Message cs2Received3 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message",
cs2Received3);
+ assertEquals("Unexpected group",
cs2Received3.getStringProperty("group"),
+ "ONE");
+
+ cs2Received3.acknowledge();
+
+
+ Message cs2Received4 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received third message",
cs2Received4);
+ assertEquals("Unexpected group",
cs2Received4.getStringProperty("group"),
+ "ONE");
+
+ cs2Received4.acknowledge();
+
+ assertNull(consumer2.receive(1000));
+ }
+
+
+ /**
+ *
+ * Tests that upon closing a consumer and its session, groups previously
assigned to that consumer are reassigned
+ * toa different consumer, including messages which were previously
delivered but have now been released.
+ *
+ * Pre-populate the queue as ONE, ONE, TWO, ONE
+ *
+ * create in sequence two consumers
+ *
+ * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+ *
+ * Then close c1 and its session without acking.
+ *
+ * If we now attempt to receive from c2, then the all messages in group
ONE should be available (which
+ * requires c2 to go "backwards" in the queue). The first such message
should be marked as redelivered
+ *
+ */
+
+ public void testConsumerCloseWithRelease() throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.group_header_key","group");
+ arguments.put("qpid.shared_msg_group","1");
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE),
true, false, false, arguments);
+ queue = (Queue)
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "ONE"));
+ producer.send(createMessage(3, "TWO"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false,
Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false,
Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ consumerConnection.start();
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message",
cs1Received);
+
+ Message received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message",
received);
+ Message first = received;
+
+ received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received second message",
received);
+
+ consumer1.close();
+ cs1.close();
+ first.acknowledge();
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message",
received);
+ assertEquals("Unexpected group", received.getStringProperty("group"),
+ "ONE");
+ assertTrue("Expected second message to be marked as redelivered " +
received.getIntProperty("msg"),
+ received.getJMSRedelivered());
+
+ received.acknowledge();
+
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received third message",
received);
+ assertEquals("Unexpected group", received.getStringProperty("group"),
+ "ONE");
+
+ received.acknowledge();
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received fourth message",
received);
+ assertEquals("Unexpected group", received.getStringProperty("group"),
+ "ONE");
+
+ received.acknowledge();
+
- Message msg = consumer.receive(TIMEOUT);
- assertEquals(1, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(5, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(9, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(12, msg.getIntProperty("msg"));
-
- msg = consumer.receive(TIMEOUT);
- assertEquals(2, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(4, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(7, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(11, msg.getIntProperty("msg"));
-
- msg = consumer.receive(TIMEOUT);
- assertEquals(3, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(6, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(8, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(10, msg.getIntProperty("msg"));
+ assertNull(consumer2.receive(1000));
}
- private Message nextMessage(int msg, boolean first, Session
producerSession, MessageProducer producer) throws JMSException
+
+ private Message createMessage(int msg, String group) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);
send.setIntProperty("msg", msg);
+ send.setStringProperty("group", group);
return send;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]