Author: robbie
Date: Mon Jan 20 21:33:58 2014
New Revision: 1559833

URL: http://svn.apache.org/r1559833
Log:
QPID-5496: WIP on a common way of configuring the default-group for a queue, 
and allowing queues with 'shared groups' to support non-grouped messages

Modified:
    
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
    
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
    
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
    
qpid/branches/QPID-5496_default_groups/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java

Modified: 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1559833&r1=1559832&r2=1559833&view=diff
==============================================================================
--- 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Mon Jan 20 21:33:58 2014
@@ -71,6 +71,7 @@ public class SimpleAMQQueue implements A
     public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
     private static final String QPID_NO_GROUP = "qpid.no-group";
     private static final String DEFAULT_SHARED_MESSAGE_GROUP = 
System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, 
QPID_NO_GROUP);
+    private static final String QPID_NO_DEFAULT_GROUP = 
"qpid.no-default-group";
 
     // TODO - should make this configurable at the vhost / broker level
     private static final int DEFAULT_MAX_GROUPS = 255;
@@ -246,19 +247,31 @@ public class SimpleAMQQueue implements A
 
         if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
         {
-            if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
-               && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+            String messageGroupKey = 
String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY));
+            boolean requestedSharedGroups = 
arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null && 
(Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS));
+
+            //Determine the default group value
+            String defaultGroup = requestedSharedGroups ? 
DEFAULT_SHARED_MESSAGE_GROUP : null;
+            if(arguments.containsKey(Queue.MESSAGE_GROUP_DEFAULT_GROUP))
+            {
+                Object defaultGroupArg = 
arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+
+                defaultGroup = defaultGroupArg == null ? null : 
defaultGroupArg.toString();
+            }
+
+            //Remove the default group if requested by the configured value
+            if(QPID_NO_DEFAULT_GROUP.equals(defaultGroup))
+            {
+                defaultGroup = null;
+            }
+
+            if(requestedSharedGroups)
             {
-                Object defaultGroup = 
arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
-                _messageGroupManager =
-                        new 
DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
-                                defaultGroup == null ? 
DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
-                                this);
+                _messageGroupManager = new 
DefinedGroupMessageGroupManager(messageGroupKey, defaultGroup, this);
             }
             else
             {
-                _messageGroupManager = new 
AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
-                        Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
+                _messageGroupManager = new 
AssignedSubscriptionMessageGroupManager(messageGroupKey, defaultGroup, 
DEFAULT_MAX_GROUPS);
             }
         }
         else
@@ -541,7 +554,7 @@ public class SimpleAMQQueue implements A
 
     public void resetSubPointersForGroups(Subscription subscription, boolean 
clearAssignments)
     {
-        QueueEntry entry = 
_messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+        QueueEntry entry = 
_messageGroupManager.findEarliestAssignedAvailableEntry(subscription, this);
         if(clearAssignments)
         {
             _messageGroupManager.clearAssignments(subscription);

Modified: 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?rev=1559833&r1=1559832&r2=1559833&view=diff
==============================================================================
--- 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
 (original)
+++ 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
 Mon Jan 20 21:33:58 2014
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.server.subscription;
 
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Iterator;
@@ -36,12 +38,14 @@ public class AssignedSubscriptionMessage
 
 
     private final String _groupId;
+    private String _defaultGroup;
     private final ConcurrentHashMap<Integer, Subscription> _groupMap = new 
ConcurrentHashMap<Integer, Subscription>();
     private final int _groupMask;
 
-    public AssignedSubscriptionMessageGroupManager(final String groupId, final 
int maxGroups)
+    public AssignedSubscriptionMessageGroupManager(final String groupId, 
String defaultGroup, final int maxGroups)
     {
         _groupId = groupId;
+        _defaultGroup = defaultGroup;
         _groupMask = pow2(maxGroups)-1;
     }
 
@@ -55,9 +59,21 @@ public class AssignedSubscriptionMessage
         return val;
     }
 
+    private Object getKey(QueueEntry entry)
+    {
+        ServerMessage<?> message = entry.getMessage();
+        AMQMessageHeader messageHeader = message == null ? null : 
message.getMessageHeader();
+        Object groupVal = messageHeader == null ? _defaultGroup : 
messageHeader.getHeader(_groupId);
+        if(groupVal == null)
+        {
+            groupVal = _defaultGroup;
+        }
+        return groupVal;
+    }
+
     public Subscription getAssignedSubscription(final QueueEntry entry)
     {
-        Object groupVal = 
entry.getMessage().getMessageHeader().getHeader(_groupId);
+        Object groupVal = getKey(entry);
         return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & 
_groupMask);
     }
 
@@ -75,7 +91,7 @@ public class AssignedSubscriptionMessage
 
     private boolean assignMessage(Subscription sub, QueueEntry entry)
     {
-        Object groupVal = 
entry.getMessage().getMessageHeader().getHeader(_groupId);
+        Object groupVal = getKey(entry);
         if(groupVal == null)
         {
             return true;
@@ -107,10 +123,10 @@ public class AssignedSubscriptionMessage
         }
     }
     
-    public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+    public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, 
AMQQueue queue)
     {
         EntryFinder visitor = new EntryFinder(sub);
-        sub.getQueue().visit(visitor);
+        queue.visit(visitor);
         return visitor.getEntry();
     }
 
@@ -131,21 +147,25 @@ public class AssignedSubscriptionMessage
                 return false;
             }
 
-            Object groupId = 
entry.getMessage().getMessageHeader().getHeader(_groupId);
+            Object groupId = getKey(entry);
             if(groupId == null)
             {
-                return false;
+                //message is not part of a group, anyone who wants it can 
consume it
+                _entry = entry;
+                return true;
             }
 
             Integer group = groupId.hashCode() & _groupMask;
             Subscription assignedSub = _groupMap.get(group);
-            if(assignedSub == _sub)
+            if(assignedSub == _sub || assignedSub == null)
             {
+                //group is either not assigned or is assigned to this 
subscription
                 _entry = entry;
                 return true;
             }
             else
             {
+                //group is already assigned to another subscription
                 return false;
             }
         }

Modified: 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1559833&r1=1559832&r2=1559833&view=diff
==============================================================================
--- 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
 (original)
+++ 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
 Mon Jan 20 21:33:58 2014
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.server.subscription;
 
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -126,29 +126,54 @@ public class DefinedGroupMessageGroupMan
         _resetHelper = resetHelper;
     }
     
-    public synchronized Subscription getAssignedSubscription(final QueueEntry 
entry)
+    public Subscription getAssignedSubscription(final QueueEntry entry)
     {
         Object groupId = getKey(entry);
 
-        Group group = _groupMap.get(groupId);
-        return group == null || !group.isValid() ? null : 
group.getSubscription();
+        if(groupId == null)
+        {
+            return null;
+        }
+        else
+        {
+            synchronized (this)
+            {
+                Group group = _groupMap.get(groupId);
+                return group == null || !group.isValid() ? null : 
group.getSubscription();
+            }
+        }
     }
 
-    public synchronized boolean acceptMessage(final Subscription sub, final 
QueueEntry entry)
+    public boolean acceptMessage(final Subscription sub, final QueueEntry 
entry)
     {
-        if(assignMessage(sub, entry))
+        Object groupId = getKey(entry);
+        if(groupId == null)
         {
             return entry.acquire(sub);
         }
         else
         {
-            return false;
+            synchronized (this)
+            {
+                if(assignMessage(sub, entry, groupId))
+                {
+                    return entry.acquire(sub);
+                }
+                else
+                {
+                    return false;
+                }
+            }
         }
     }
 
-    private boolean assignMessage(final Subscription sub, final QueueEntry 
entry)
+    private boolean assignMessage(final Subscription sub, final QueueEntry 
entry, Object groupId)
     {
-        Object groupId = getKey(entry);
+        if(groupId == null)
+        {
+            return true;
+        }
+
         Group group = _groupMap.get(groupId);
 
         if(group == null || !group.isValid())
@@ -157,7 +182,7 @@ public class DefinedGroupMessageGroupMan
 
             _groupMap.put(groupId, group);
 
-            // there's a small change that the group became empty between the 
point at which getNextAvailable() was
+            // there's a small chance that the group became empty between the 
point at which SAMQQ#getNextAvailableEntry() was
             // called on the subscription, and when accept message is 
called... in that case we want to avoid delivering
             // out of order
             if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
@@ -179,10 +204,10 @@ public class DefinedGroupMessageGroupMan
         }
     }
 
-    public synchronized QueueEntry findEarliestAssignedAvailableEntry(final 
Subscription sub)
+    public synchronized QueueEntry findEarliestAssignedAvailableEntry(final 
Subscription sub, AMQQueue queue)
     {
         EntryFinder visitor = new EntryFinder(sub);
-        sub.getQueue().visit(visitor);
+        queue.visit(visitor);
         return visitor.getEntry();
     }
 
@@ -204,16 +229,26 @@ public class DefinedGroupMessageGroupMan
             }
 
             Object groupId = getKey(entry);
-
-            Group group = _groupMap.get(groupId);
-            if(group != null && group.getSubscription() == _sub)
+            if(groupId != null)
             {
-                _entry = entry;
-                return true;
+                Group group = _groupMap.get(groupId);
+                if(group == null || group.getSubscription() == null || 
group.getSubscription() == _sub)
+                {
+                    //group is either not assigned or is assigned to this 
subscription
+                    _entry = entry;
+                    return true;
+                }
+                else
+                {
+                    //group is already assigned to another subscription
+                    return false;
+                }
             }
             else
             {
-                return false;
+                //message is not part of a group, anyone who wants it can 
consume it
+                _entry = entry;
+                return true;
             }
         }
 
@@ -230,7 +265,7 @@ public class DefinedGroupMessageGroupMan
     
     private Object getKey(QueueEntry entry)
     {
-        ServerMessage message = entry.getMessage();
+        ServerMessage<?> message = entry.getMessage();
         AMQMessageHeader messageHeader = message == null ? null : 
message.getMessageHeader();
         Object groupVal = messageHeader == null ? _defaultGroup : 
messageHeader.getHeader(_groupId);
         if(groupVal == null)

Modified: 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java?rev=1559833&r1=1559832&r2=1559833&view=diff
==============================================================================
--- 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
 (original)
+++ 
qpid/branches/QPID-5496_default_groups/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
 Mon Jan 20 21:33:58 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.subscription;
 
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
 public interface MessageGroupManager
@@ -35,7 +36,7 @@ public interface MessageGroupManager
 
     boolean acceptMessage(Subscription sub, QueueEntry entry);
 
-    QueueEntry findEarliestAssignedAvailableEntry(Subscription sub);
+    QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue 
queue);
 
     void clearAssignments(Subscription sub);
 }

Modified: 
qpid/branches/QPID-5496_default_groups/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-5496_default_groups/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1559833&r1=1559832&r2=1559833&view=diff
==============================================================================
--- 
qpid/branches/QPID-5496_default_groups/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
 (original)
+++ 
qpid/branches/QPID-5496_default_groups/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
 Mon Jan 20 21:33:58 2014
@@ -227,6 +227,7 @@ public class MessageGroupQueueTest exten
         producerSession.close();
         producerConnection.close();
 
+        //sessions with a prefetch of 1
         Session cs1 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
         Session cs2 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to