Author: rgodfrey
Date: Mon Aug 24 20:37:19 2009
New Revision: 807369

URL: http://svn.apache.org/viewvc?rev=807369&view=rev
Log:
Implement 0-10 flow control

Added:
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
Modified:
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
    
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
    
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -43,7 +43,7 @@
         return _bytesCredit.get();
     }
 
-    public void addCredit(long messageCredit, long bytesCredit)
+    public void restoreCredit(long messageCredit, long bytesCredit)
     {
         _bytesCredit.addAndGet(bytesCredit);
         setSuspended(false);

Added: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java?rev=807369&view=auto
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
 (added)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.flow;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class CreditCreditManager extends AbstractFlowCreditManager implements 
FlowCreditManager_0_10
+{
+        private volatile long _bytesCredit;
+        private volatile long _messageCredit;
+
+
+    public CreditCreditManager()
+     {
+         this(0L, 0L);
+     }
+
+    public CreditCreditManager(long bytesCredit, long messageCredit)
+    {
+        _bytesCredit = bytesCredit;
+        _messageCredit = messageCredit;
+    }
+
+
+    public synchronized void setCreditLimits(final long bytesCredit, final 
long messageCredit)
+    {
+        _bytesCredit = bytesCredit;
+        _messageCredit = messageCredit;
+
+        setSuspended(!hasCredit());
+
+    }
+
+
+    public long getMessageCredit()
+    {
+         return _messageCredit == -1L
+                    ? Long.MAX_VALUE
+                    : _messageCredit;
+    }
+
+    public long getBytesCredit()
+    {
+        return _bytesCredit == -1L
+                    ? Long.MAX_VALUE
+                    : _bytesCredit;
+    }
+
+    public synchronized void restoreCredit(final long messageCredit, final 
long bytesCredit)
+    {
+
+    }
+
+    
+    public synchronized void addCredit(final long messageCredit, final long 
bytesCredit)
+    {
+        boolean notifyIncrease = true;
+        if(_messageCredit >= 0L && messageCredit > 0L)
+        {
+            notifyIncrease = _messageCredit != 0L;
+            _messageCredit += messageCredit;
+        }
+
+
+
+        if(_bytesCredit >= 0L && bytesCredit > 0L)
+        {
+            notifyIncrease = notifyIncrease && bytesCredit>0;
+            _bytesCredit += bytesCredit;
+
+
+
+            if(notifyIncrease)
+            {
+                notifyIncreaseBytesCredit();
+            }
+        }
+
+
+
+        setSuspended(!hasCredit());
+
+    }
+
+
+
+    public synchronized boolean hasCredit()
+    {
+        // Note !=, if credit is < 0 that indicates infinite credit
+        return (_bytesCredit != 0L  && _messageCredit != 0L);
+    }
+
+    public synchronized boolean useCreditForMessage(final ServerMessage msg)
+    {
+        if(_messageCredit >= 0L)
+        {
+            if(_messageCredit > 0)
+            {
+                if(_bytesCredit < 0L)
+                {
+                    _messageCredit--;
+
+                    return true;
+                }
+                else if(msg.getSize() <= _bytesCredit)
+                {
+                    _messageCredit--;
+                    _bytesCredit -= msg.getSize();
+
+                    return true;
+                }
+                else
+                {
+                    //setSuspended(true);
+                    return false;
+                }
+            }
+            else
+            {
+                setSuspended(true);
+                return false;
+            }
+        }
+        else if(_bytesCredit >= 0L)
+        {
+            if(msg.getSize() <= _bytesCredit)
+            {
+                 _bytesCredit -= msg.getSize();
+
+                return true;
+            }
+            else
+            {
+                //setSuspended(true);
+                return false;
+            }
+
+        }
+        else
+        {
+            return true;
+        }
+
+    }
+
+    public synchronized void stop()
+    {
+        if(_bytesCredit > 0)
+        {
+            _bytesCredit = 0;
+        }
+        if(_messageCredit > 0)
+        {
+            _messageCredit = 0;
+        }
+
+    }
+
+
+}

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -38,11 +38,10 @@
 
     boolean removeListener(FlowCreditManagerListener listener);
 
-    public void addCredit(long messageCredit, long bytesCredit);
-
-    public void removeAllCredit();
+    public void restoreCredit(long messageCredit, long bytesCredit);
 
     public boolean hasCredit();
 
     public boolean useCreditForMessage(ServerMessage msg);
+
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -34,7 +34,7 @@
         return -1L;
     }
 
-    public void addCredit(long messageCredit, long bytesCredit)
+    public void restoreCredit(long messageCredit, long bytesCredit)
     {
     }
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -43,7 +43,7 @@
         return _bytesCredit;
     }
 
-    public synchronized void addCredit(long messageCredit, long bytesCredit)
+    public synchronized void restoreCredit(long messageCredit, long 
bytesCredit)
     {        
         _messageCredit += messageCredit;
         _bytesCredit += bytesCredit;

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -43,7 +43,7 @@
         return -1L;
     }
 
-    public void addCredit(long messageCredit, long bytesCredit)
+    public void restoreCredit(long messageCredit, long bytesCredit)
     {
         _messageCredit.addAndGet(messageCredit);
         setSuspended(false);

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -91,7 +91,7 @@
         return _bytesCredit;
     }
 
-    public synchronized void addCredit(final long messageCredit, final long 
bytesCredit)
+    public synchronized void restoreCredit(final long messageCredit, final 
long bytesCredit)
     {
         final long messageCreditLimit = _messageCreditLimit;
         boolean notifyIncrease = true;

Added: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java?rev=807369&view=auto
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
 (added)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
 Mon Aug 24 20:37:19 2009
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.flow;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class WindowCreditManager extends AbstractFlowCreditManager implements 
FlowCreditManager_0_10
+{
+    private volatile long _bytesCreditLimit;
+    private volatile long _messageCreditLimit;
+
+    private volatile long _bytesUsed;
+    private volatile long _messageUsed;
+
+     public WindowCreditManager()
+     {
+         this(0L, 0L);
+     }
+
+    public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+    {
+        _bytesCreditLimit = bytesCreditLimit;
+        _messageCreditLimit = messageCreditLimit;
+    }
+
+
+    public synchronized void setCreditLimits(final long bytesCreditLimit, 
final long messageCreditLimit)
+    {
+        _bytesCreditLimit = bytesCreditLimit;
+        _messageCreditLimit = messageCreditLimit;
+
+        setSuspended(!hasCredit());
+
+    }
+
+
+    public long getMessageCredit()
+    {
+         return _messageCreditLimit == -1L
+                    ? Long.MAX_VALUE
+                    : _messageUsed < _messageCreditLimit ? _messageCreditLimit 
- _messageUsed : 0L;
+    }
+
+    public long getBytesCredit()
+    {
+        return _bytesCreditLimit == -1L
+                    ? Long.MAX_VALUE
+                    : _bytesUsed < _bytesCreditLimit ? _bytesCreditLimit - 
_bytesUsed : 0L;
+    }
+
+    public synchronized void restoreCredit(final long messageCredit, final 
long bytesCredit)
+    {
+        boolean notifyIncrease = true;
+        if(_messageCreditLimit > 0L)
+        {
+            notifyIncrease = (_messageUsed != _messageCreditLimit);
+            _messageUsed -= messageCredit;
+
+            //TODO log warning
+            if(_messageUsed < 0L)
+            {
+                _messageUsed = 0;
+            }
+        }
+
+
+
+        if(_bytesCreditLimit > 0L)
+        {
+            notifyIncrease = notifyIncrease && bytesCredit>0;
+            _bytesUsed -= bytesCredit;
+
+            //TODO log warning
+            if(_bytesUsed < 0L)
+            {
+                _bytesUsed = 0;
+            }
+
+            if(notifyIncrease)
+            {
+                notifyIncreaseBytesCredit();
+            }
+        }
+
+
+
+        setSuspended(!hasCredit());
+
+    }
+
+
+
+    public synchronized boolean hasCredit()
+    {
+        return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
+                && (_messageCreditLimit < 0L || _messageCreditLimit > 
_messageUsed);
+    }
+
+    public synchronized boolean useCreditForMessage(final ServerMessage msg)
+    {
+        if(_messageCreditLimit >= 0L)
+        {
+            if(_messageUsed < _messageCreditLimit)
+            {
+                if(_bytesCreditLimit < 0L)
+                {
+                    _messageUsed++;
+
+                    return true;
+                }
+                else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+                {
+                    _messageUsed++;
+                    _bytesUsed += msg.getSize();
+
+                    return true;
+                }
+                else
+                {
+                    //setSuspended(true);
+                    return false;
+                }
+            }
+            else
+            {
+                setSuspended(true);
+                return false;
+            }
+        }
+        else if(_bytesCreditLimit >= 0L)
+        {
+            if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+            {
+                 _bytesUsed += msg.getSize();
+
+                return true;
+            }
+            else
+            {
+                //setSuspended(true);
+                return false;
+            }
+
+        }
+        else
+        {
+            return true;
+        }
+
+    }
+
+    public void stop()
+    {
+        if(_bytesCreditLimit > 0)
+        {
+            _bytesCreditLimit = 0;
+        }
+        if(_messageCreditLimit > 0)
+        {
+            _messageCreditLimit = 0;
+        }
+
+    }
+
+    public synchronized void addCredit(long bytes, long count)
+    {
+        if(bytes > 0)
+        {
+            _bytesCreditLimit += bytes;
+        }
+        else if(bytes == -1)
+        {
+            _bytesCreditLimit = -1;
+        }
+
+
+        if(count > 0)
+        {
+            _messageCreditLimit += count;
+        }
+        else if(count == -1)
+        {
+            _messageCreditLimit = -1;
+        }
+    }
+}

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Mon Aug 24 20:37:19 2009
@@ -439,7 +439,7 @@
             {
                 if (!sub.wouldSuspend(entry))
                 {
-                    if (!sub.isBrowser() && !entry.acquire(sub))
+                    if (sub.acquires() && !entry.acquire(sub))
                     {
                         // restore credit here that would have been taken away 
by wouldSuspend since we didn't manage
                         // to acquire the entry for this subscription
@@ -556,7 +556,7 @@
             Subscription sub = subscriberIter.getNode().getSubscription();
 
             // we don't make browsers send the same stuff twice
-            if (!sub.isBrowser())
+            if (sub.seesRequeues())
             {
                 updateLastSeenEntry(sub, entry);
             }
@@ -1255,7 +1255,7 @@
                     {
                         if (!sub.wouldSuspend(node))
                         {
-                            if (!sub.isBrowser() && !node.acquire(sub))
+                            if (sub.acquires() && !node.acquire(sub))
                             {
                                 sub.restoreCredit(node);
                             }
@@ -1263,7 +1263,7 @@
                             {
                                 deliverMessage(sub, node);
 
-                                if (sub.isBrowser())
+                                if (!sub.acquires())
                                 {
                                     QueueEntry newNode = _entries.next(node);
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
 Mon Aug 24 20:37:19 2009
@@ -59,7 +59,9 @@
 
     boolean isClosed();
 
-    boolean isBrowser();
+    boolean acquires();
+
+    boolean seesRequeues();
 
     void close();
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 Mon Aug 24 20:37:19 2009
@@ -33,7 +33,6 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -547,7 +546,7 @@
 
     public void restoreCredit(final QueueEntry queueEntry)
     {
-        _creditManager.addCredit(1, queueEntry.getSize());
+        _creditManager.restoreCredit(1, queueEntry.getSize());
     }
 
 
@@ -626,4 +625,16 @@
         ProtocolOutputConverter converter = 
getChannel().getProtocolSession().getProtocolOutputConverter();
         converter.confirmConsumerAutoClose(getChannel().getChannelId(), 
getConsumerTag());
     }
+
+    public boolean acquires()
+    {
+        return !isBrowser();
+    }
+
+    public boolean seesRequeues()
+    {
+        return !isBrowser();
+    }
+
+    abstract boolean isBrowser();
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 Mon Aug 24 20:37:19 2009
@@ -22,11 +22,17 @@
 
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.MessageCleanupException;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.CreditCreditManager;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.*;
@@ -35,10 +41,9 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.ArrayList;
 
-import sun.awt.X11.XSystemTrayPeer;
-
 public class Subscription_0_10 implements Subscription, 
FlowCreditManager.FlowCreditManagerListener
 {
     private final QueueEntry.SubscriptionAcquiredState _owningState = new 
QueueEntry.SubscriptionAcquiredState(this);
@@ -49,7 +54,7 @@
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
 
 
-    private FlowCreditManager _creditManager;
+    private FlowCreditManager_0_10 _creditManager;
 
 
     private StateListener _stateListener = new StateListener()
@@ -66,11 +71,14 @@
     private final FilterManager _filters;
     private final MessageAcceptMode _acceptMode;
     private final MessageAcquireMode _acquireMode;
+    private MessageFlowMode _flowMode;
     private final ServerSession _session;
+    private AtomicBoolean _stopped = new AtomicBoolean(true);
+    private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new 
ConcurrentHashMap<Integer, QueueEntry>();
 
 
     public Subscription_0_10(ServerSession session, String destination, 
MessageAcceptMode acceptMode,
-                             MessageAcquireMode acquireMode, FlowCreditManager 
creditManager, FilterManager filters)
+                             MessageAcquireMode acquireMode, 
FlowCreditManager_0_10 creditManager, FilterManager filters)
     {
         _session = session;
         _destination = destination;
@@ -159,7 +167,12 @@
 
     public boolean isBrowser()
     {
-        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
+        return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
+    }
+
+    public boolean seesRequeues()
+    {
+        return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode 
== MessageAcceptMode.EXPLICIT;
     }
 
     public void close()
@@ -218,7 +231,7 @@
     }
 
 
-    public void send(QueueEntry entry) throws AMQException
+    public void send(final QueueEntry entry) throws AMQException
     {
         ServerMessage serverMsg = entry.getMessage();
 
@@ -278,14 +291,78 @@
         deliveryProps.setRedelivered(entry.isRedelivered());
 
         newHeaders.add(deliveryProps);
-        xfr.setHeader(new Header(newHeaders));
+        xfr.setHeader(new Header(newHeaders));                
+
+        if(_acceptMode == MessageAcceptMode.NONE)
+        {
+            xfr.setCompletionListener(new 
MessageAcceptCompletionListener(this, _session, entry));
+        }
+
 
 
         _session.sendMessage(xfr);
 
+        if(_acceptMode == MessageAcceptMode.EXPLICIT)
+        {
+            // potential race condition if incomming commands on this session 
can be processed on a different thread
+            // to this one (i.e. the message is only put in the map *after* it 
has been sent, theoretically we could get
+            // acknowledgement back before reaching the next line)
+            _session.onMessageDispositionChange(xfr, new 
ServerSession.MessageDispositionChangeListener()
+                                        {
+                                            public void onAccept()
+                                            {
+                                                acknowledge(entry);
+                                            }
+
+                                            public void onRelease()
+                                            {
+                                                release(entry);
+                                            }
+
+                                            public void onReject()
+                                            {
+                                                reject(entry);
+                                            }
+            });
+        }
+        else
+        {
+            _session.onMessageDispositionChange(xfr, new 
ServerSession.MessageDispositionChangeListener()
+                                        {
+                                            public void onAccept()
+                                            {
+                                                // TODO : should log error of 
explicit accept on non-explicit sub
+                                            }
+
+                                            public void onRelease()
+                                            {
+                                                release(entry);
+                                            }
+
+                                            public void onReject()
+                                            {
+                                                reject(entry);
+                                            }
+
+            });
+        }
+
+
+    }
+
+    private void reject(QueueEntry entry)
+    {
+        entry.setRedelivered(true);
+        entry.reject(this);
 
     }
 
+    private void release(QueueEntry entry)
+    {
+        entry.setRedelivered(true);
+        entry.release();
+    }
+
     public void queueDeleted(AMQQueue queue)
     {
         _deleted.set(true);
@@ -308,7 +385,7 @@
 
     public void restoreCredit(QueueEntry queueEntry)
     {
-        _creditManager.addCredit(1, queueEntry.getSize());
+        _creditManager.restoreCredit(1, queueEntry.getSize());
     }
 
     public void setStateListener(StateListener listener)
@@ -342,20 +419,92 @@
     }
 
 
-    public FlowCreditManager getCreditManager()
+    public FlowCreditManager_0_10 getCreditManager()
     {
         return _creditManager;
     }
 
-    public void setCreditManager(FlowCreditManager creditManager)
+
+    public void stop()
+    {
+        if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+        {
+            _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+        }
+        _stopped.set(true);
+    }
+
+    public void addCredit(MessageCreditUnit unit, long value)
+    {
+        FlowCreditManager_0_10 creditManager = getCreditManager();
+
+        switch (unit)
+        {
+            case MESSAGE:
+
+                creditManager.addCredit(value, 0L);
+                break;
+            case BYTE:
+                creditManager.addCredit(0L, value);
+                break;
+        }
+
+        _stopped.set(false);
+
+        if(creditManager.hasCredit())
+        {
+            if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+            {
+                _stateListener.stateChange(this, State.SUSPENDED, 
State.ACTIVE);
+            }
+        }
+
+    }
+
+    public void setFlowMode(MessageFlowMode flowMode)
     {
+
         _creditManager.removeListener(this);
 
-        _creditManager = creditManager;
+        switch(flowMode)
+        {
+            case CREDIT:
+                _creditManager = new CreditCreditManager(0l,0l);
+                break;
+            case WINDOW:
+                _creditManager = new WindowCreditManager(0l,0l);
+                break;
+            default:
+                throw new RuntimeException("Unknown message flow mode: " + 
flowMode);
+        }
+        _creditManager.addStateListener(this);
+    }
 
-        creditManager.addStateListener(this);
+    public boolean isStopped()
+    {
+        return _stopped.get();
+    }
 
+    public boolean acquires()
+    {
+        return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
     }
 
+    public void acknowledge(QueueEntry entry)
+    {
+        // TODO Fix Store Context / cleanup
 
+        try
+        {
+            entry.discard(new StoreContext());
+        }
+        catch (FailedDequeueException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }
+        catch (MessageCleanupException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }
+    }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Mon Aug 24 20:37:19 2009
@@ -21,15 +21,31 @@
 package org.apache.qpid.server.transport;
 
 import org.apache.qpid.transport.*;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.AMQException;
 
-import java.util.ArrayList;
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListMap;
+import static org.apache.qpid.util.Serial.*;
 
 public class ServerSession extends Session
 {
+
+
+    public static interface MessageDispositionChangeListener
+    {
+        public void onAccept();
+
+        public void onRelease();
+
+        public void onReject();
+    }
+
+
+    private final SortedMap<Integer, MessageDispositionChangeListener> 
_messageDispositionListenerMap =
+            new ConcurrentSkipListMap<Integer, 
MessageDispositionChangeListener>();
+
     ServerSession(Connection connection, Binary name, long expiry)
     {
         super(connection, name, expiry);
@@ -58,8 +74,112 @@
         }
     }
 
+    
     public void sendMessage(MessageTransfer xfr)
     {
         invoke(xfr);
     }
+
+    public void onMessageDispositionChange(MessageTransfer xfr, 
MessageDispositionChangeListener acceptListener)
+    {
+        _messageDispositionListenerMap.put(xfr.getId(), acceptListener);
+    }
+
+
+    private static interface MessageDispositionAction
+    {
+        void performAction(MessageDispositionChangeListener  listener);
+    }
+
+    public void accept(RangeSet ranges)
+    {
+        dispositionChange(ranges, new MessageDispositionAction()
+                                      {
+                                          public void 
performAction(MessageDispositionChangeListener listener)
+                                          {
+                                              listener.onAccept();
+                                          }
+                                      });
+    }
+
+
+    public void release(RangeSet ranges)
+    {
+        dispositionChange(ranges, new MessageDispositionAction()
+                                      {
+                                          public void 
performAction(MessageDispositionChangeListener listener)
+                                          {
+                                              listener.onRelease();
+                                          }
+                                      });
+    }
+
+    public void reject(RangeSet ranges)
+    {
+        dispositionChange(ranges, new MessageDispositionAction()
+                                      {
+                                          public void 
performAction(MessageDispositionChangeListener listener)
+                                          {
+                                              listener.onReject();
+                                          }
+                                      });
+    }
+
+    public void dispositionChange(RangeSet ranges, MessageDispositionAction 
action)
+    {
+        if(!_messageDispositionListenerMap.isEmpty())
+        {
+            Iterator<Integer> unacceptedMessages = 
_messageDispositionListenerMap.keySet().iterator();
+            Iterator<Range> rangeIter = ranges.iterator();
+
+            if(rangeIter.hasNext())
+            {
+                Range range = rangeIter.next();
+
+                while(range != null && unacceptedMessages.hasNext())
+                {
+                    int next = unacceptedMessages.next();
+                    while(gt(next, range.getUpper()))
+                    {
+                        if(rangeIter.hasNext())
+                        {
+                            range = rangeIter.next();
+                        }
+                        else
+                        {
+                            range = null;
+                            break;
+                        }
+                    }
+                    if(range != null && range.includes(next))
+                    {
+                        MessageDispositionChangeListener changeListener = 
_messageDispositionListenerMap.remove(next);
+                        action.performAction(changeListener);
+                    }
+
+
+                }
+
+            }
+
+
+        }
+    }
+
+    public void removeDispositionListener(Method method)
+    {
+        _messageDispositionListenerMap.remove(method.getId());
+    }
+
+    public void releaseAll()
+    {
+        for(MessageDispositionChangeListener listener : 
_messageDispositionListenerMap.values())
+        {
+            listener.onRelease();
+        }
+        _messageDispositionListenerMap.clear();
+    }
+
+
+
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
 Mon Aug 24 20:37:19 2009
@@ -27,9 +27,7 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.flow.*;
 import org.apache.qpid.AMQException;
@@ -37,12 +35,11 @@
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
-import java.nio.ByteBuffer;
 
 public class ServerSessionDelegate extends SessionDelegate
 {
     private final IApplicationRegistry _appRegistry;
-    private Map<String, Subscription_0_10> _subscriptions = new 
HashMap<String, Subscription_0_10>();
+    private Map<String, Subscription_0_10> _subscriptions = new 
HashMap<String, Subscription_0_10>();    
 
     public ServerSessionDelegate(IApplicationRegistry appRegistry)
     {
@@ -62,7 +59,7 @@
     @Override
     public void messageAccept(Session session, MessageAccept method)
     {
-        super.messageAccept(session, method);
+        ((ServerSession)session).accept(method.getTransfers());
     }
 
 
@@ -70,13 +67,13 @@
     @Override
     public void messageReject(Session session, MessageReject method)
     {
-        super.messageReject(session, method);
+        ((ServerSession)session).reject(method.getTransfers());
     }
 
     @Override
     public void messageRelease(Session session, MessageRelease method)
     {
-        super.messageRelease(session, method);
+        ((ServerSession)session).release(method.getTransfers());
     }
 
     @Override
@@ -102,7 +99,8 @@
 
         //TODO null check
 
-        FlowCreditManager creditManager = new MessageOnlyCreditManager(0L);
+
+        FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
 
         // TODO filters
 
@@ -439,6 +437,36 @@
         super.queueQuery(session, method);
     }
 
+    @Override
+    public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
+    {
+        String destination = sfm.getDestination();
+
+        Subscription_0_10 sub = _subscriptions.get(destination);
+
+        // TODO null check
+
+        if(sub.isStopped())
+        {
+            sub.setFlowMode(sfm.getFlowMode());
+        }
+
+        
+
+    }
+
+    @Override
+    public void messageStop(Session ssn, MessageStop stop)
+    {
+        String destination = stop.getDestination();
+
+        Subscription_0_10 sub = _subscriptions.get(destination);
+
+        // TODO null check
+
+        sub.stop();
+
+    }
 
     @Override
     public void messageFlow(Session ssn, MessageFlow flow)
@@ -447,12 +475,20 @@
 
         Subscription_0_10 sub = _subscriptions.get(destination);
 
-        FlowCreditManager creditManager = sub.getCreditManager();
+        // TODO null check
 
-        if(flow.getUnit() == MessageCreditUnit.MESSAGE)
+        sub.addCredit(flow.getUnit(), flow.getValue());
+
+    }
+
+    @Override
+    public void closed(Session session)
+    {
+        super.closed(session);
+        for(Subscription_0_10 sub : _subscriptions.values())
         {
-            creditManager.addCredit(flow.getValue(), 0L);
+            sub.close();
+            ((ServerSession)session).releaseAll();
         }
-
     }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 Mon Aug 24 20:37:19 2009
@@ -119,6 +119,16 @@
         return _closed;
     }
 
+    public boolean acquires()
+    {
+        return true;
+    }
+
+    public boolean seesRequeues()
+    {
+        return true;
+    }
+
     public boolean isSuspended()
     {
         return false;

Modified: 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
 Mon Aug 24 20:37:19 2009
@@ -35,6 +35,7 @@
 public abstract class Method extends Struct implements ProtocolEvent
 {
 
+
     public static final Method create(int type)
     {
         // XXX: should generate separate factories for separate
@@ -43,12 +44,18 @@
     }
 
     // XXX: command subclass?
+    public static interface CompletionListener
+    {
+        public void onComplete(Method method);
+    }
+
     private int id;
     private int channel;
     private boolean idSet = false;
     private boolean sync = false;
     private boolean batch = false;
     private boolean unreliable = false;
+    private CompletionListener completionListener;
 
     public final int getId()
     {
@@ -61,6 +68,11 @@
         this.idSet = true;
     }
 
+    boolean idSet()
+    {
+        return idSet;
+    }
+
     public final int getChannel()
     {
         return channel;
@@ -152,6 +164,21 @@
         }
     }
 
+
+    public void setCompletionListener(CompletionListener completionListener)
+    {
+        this.completionListener = completionListener;
+    }
+
+    public void complete()
+    {
+        if(completionListener!= null)
+        {
+            completionListener.onComplete(this);
+            completionListener = null;            
+        }
+    }
+
     public String toString()
     {
         StringBuilder str = new StringBuilder();

Modified: 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
 Mon Aug 24 20:37:19 2009
@@ -52,6 +52,11 @@
         return ranges.getFirst();
     }
 
+    public Range getLast()
+    {
+        return ranges.getLast();
+    }
+
     public boolean includes(Range range)
     {
         for (Range r : this)

Modified: 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Mon Aug 24 20:37:19 2009
@@ -448,7 +448,7 @@
         }
     }
 
-    boolean complete(int lower, int upper)
+    protected boolean complete(int lower, int upper)
     {
         //avoid autoboxing
         if(log.isDebugEnabled())
@@ -465,8 +465,9 @@
                 if (m != null)
                 {
                     commandBytes -= m.getBodySize();
+                    m.complete();
+                    commands[idx] = null;                    
                 }
-                commands[idx] = null;
             }
             if (le(lower, maxComplete + 1))
             {
@@ -561,7 +562,8 @@
                           "(state=%s)", state));
                 }
 
-                int next = commandsOut++;
+                int next;
+                next = commandsOut++;
                 m.setId(next);
 
                 if (isFull(next))
@@ -918,6 +920,14 @@
                     }
                 }
             }
+            if(state == CLOSED)
+            {
+                delegate.closed(this);
+            }
+            else
+            {
+                delegate.detached(this);
+            }
         }
     }
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 Mon Aug 24 20:37:19 2009
@@ -184,4 +184,11 @@
         }
     }
 
+    public void closed(Session session)
+    {
+    }
+
+    public void detached(Session session)
+    {        
+    }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=807369&r1=807368&r2=807369&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 Mon Aug 24 20:37:19 2009
@@ -210,6 +210,16 @@
         return false;
     }
 
+    public boolean acquires()
+    {
+        return true;
+    }
+
+    public boolean seesRequeues()
+    {
+        return true;
+    }
+
     public boolean isBrowser()
     {
         return false;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to