Author: rgodfrey
Date: Mon Jul 20 22:18:21 2009
New Revision: 796044

URL: http://svn.apache.org/viewvc?rev=796044&view=rev
Log:
Updated to get basic subscription functionality

Added:
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Modified:
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
 Mon Jul 20 22:18:21 2009
@@ -33,6 +33,16 @@
         _bytesCredit = new AtomicLong(initialCredit);
     }
 
+    public long getMessageCredit()
+    {
+        return -1L;
+    }
+
+    public long getBytesCredit()
+    {
+        return _bytesCredit.get();
+    }
+
     public void addCredit(long messageCredit, long bytesCredit)
     {
         _bytesCredit.addAndGet(bytesCredit);
@@ -71,4 +81,9 @@
         }
 
     }
+
+    public void setBytesCredit(long bytesCredit)
+    {
+        _bytesCredit.set( bytesCredit );
+    }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
 Mon Jul 20 22:18:21 2009
@@ -25,6 +25,9 @@
 */
 public interface FlowCreditManager
 {
+    long getMessageCredit();
+
+    long getBytesCredit();
 
     public static interface FlowCreditManagerListener
     {

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
 Mon Jul 20 22:18:21 2009
@@ -24,6 +24,16 @@
 */
 public class LimitlessCreditManager extends AbstractFlowCreditManager 
implements FlowCreditManager
 {
+    public long getMessageCredit()
+    {
+        return -1L;
+    }
+
+    public long getBytesCredit()
+    {
+        return -1L;
+    }
+
     public void addCredit(long messageCredit, long bytesCredit)
     {
     }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
 Mon Jul 20 22:18:21 2009
@@ -27,14 +27,24 @@
     private long _messageCredit;
     private long _bytesCredit;
 
-    MessageAndBytesCreditManager(final long messageCredit, final long 
bytesCredit)
+    public MessageAndBytesCreditManager(final long messageCredit, final long 
bytesCredit)
     {
         _messageCredit = messageCredit;
         _bytesCredit = bytesCredit;
     }
 
-    public synchronized void addCredit(long messageCredit, long bytesCredit)
+    public synchronized long getMessageCredit()
+    {
+        return _messageCredit;
+    }
+
+    public synchronized long getBytesCredit()
     {
+        return _bytesCredit;
+    }
+
+    public synchronized void addCredit(long messageCredit, long bytesCredit)
+    {        
         _messageCredit += messageCredit;
         _bytesCredit += bytesCredit;
         setSuspended(hasCredit());
@@ -74,4 +84,9 @@
         }
         
     }
+
+    public synchronized void setBytesCredit(long bytesCredit)
+    {
+        _bytesCredit = bytesCredit;
+    }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
 Mon Jul 20 22:18:21 2009
@@ -33,10 +33,21 @@
         _messageCredit = new AtomicLong(initialCredit);
     }
 
+    public long getMessageCredit()
+    {
+        return _messageCredit.get();
+    }
+
+    public long getBytesCredit()
+    {
+        return -1L;
+    }
+
     public void addCredit(long messageCredit, long bytesCredit)
     {
-        setSuspended(false);
         _messageCredit.addAndGet(messageCredit);
+        setSuspended(false);
+
     }
 
     public void removeAllCredit()
@@ -73,4 +84,5 @@
         }
                 
     }
+
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
 Mon Jul 20 22:18:21 2009
@@ -81,6 +81,16 @@
     }
 
 
+    public long getMessageCredit()
+    {
+        return _messageCredit;
+    }
+
+    public long getBytesCredit()
+    {
+        return _bytesCredit;
+    }
+
     public synchronized void addCredit(final long messageCredit, final long 
bytesCredit)
     {
         final long messageCreditLimit = _messageCreditLimit;

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
 Mon Jul 20 22:18:21 2009
@@ -23,6 +23,7 @@
 import org.apache.qpid.transport.*;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
 
 
 public class MessageTransferMessage implements InboundMessage, ServerMessage
@@ -96,4 +97,17 @@
     {
         return _arrivalTime;
     }
+
+    public Header getHeader()
+    {
+        return _xfr.getHeader();
+
+    }
+
+    public ByteBuffer getBody()
+    {
+        return _xfr.getBody();
+    }
+
+
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Mon Jul 20 22:18:21 2009
@@ -160,6 +160,8 @@
 
     void setRedelivered(boolean b);
 
+    boolean isRedelivered();
+
     Subscription getDeliveredSubscription();
 
     void reject();

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Mon Jul 20 22:18:21 2009
@@ -199,6 +199,11 @@
         _redelivered = b;
     }
 
+    public boolean isRedelivered()
+    {
+        return _redelivered;
+    }
+
     public Subscription getDeliveredSubscription()
     {
             EntryState state = _state;

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Mon Jul 20 22:18:21 2009
@@ -1214,8 +1214,8 @@
                 {
                     unregisterSubscription(sub);
 
-                    ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                    
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
+                    sub.confirmAutoClose();
+
                 }
                 else if (!atTail)
                 {
@@ -1396,8 +1396,7 @@
                             {
                                 unregisterSubscription(sub);
 
-                                ProtocolOutputConverter converter = 
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                                
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), 
sub.getConsumerTag());
+                                sub.confirmAutoClose();
                             }
                         }
                         else

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
 Mon Jul 20 22:18:21 2009
@@ -30,6 +30,7 @@
 {
 
 
+
     public static enum State
     {
         ACTIVE,
@@ -48,8 +49,6 @@
 
     void setQueue(AMQQueue queue);
 
-    AMQChannel getChannel();
-
     AMQShortString getConsumerTag();
 
     boolean isSuspended();
@@ -64,8 +63,6 @@
 
     void close();
 
-    boolean filtersMessages();
-
     void send(QueueEntry msg) throws AMQException;
 
     void queueDeleted(AMQQueue queue); 
@@ -74,9 +71,8 @@
     boolean wouldSuspend(QueueEntry msg);
 
     void getSendLock();
-    void releaseSendLock();
 
-    void resend(final QueueEntry entry) throws AMQException;
+    void releaseSendLock();
 
     void restoreCredit(final QueueEntry queueEntry);
 
@@ -91,6 +87,7 @@
 
     boolean isActive();
 
+    void confirmAutoClose();
 
 
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 Mon Jul 20 22:18:21 2009
@@ -32,6 +32,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -374,6 +375,14 @@
 
     public boolean hasInterest(QueueEntry entry)
     {
+
+        // TODO 0-10 to 0-8 conversion
+        if(!(entry.getMessage() instanceof AMQMessage))
+        {
+            return false;
+        }
+
+
         //check that the message hasn't been rejected
         if (entry.isRejectedBy(this))
         {
@@ -516,11 +525,6 @@
         _stateChangeLock.unlock();
     }
 
-    public void resend(final QueueEntry entry) throws AMQException
-    {
-        _queue.resend(entry, this);
-    }
-
     public AMQChannel getChannel()
     {
         return _channel;
@@ -617,4 +621,9 @@
         return _owningState;
     }
 
+    public void confirmAutoClose()
+    {
+        ProtocolOutputConverter converter = 
getChannel().getProtocolSession().getProtocolOutputConverter();
+        converter.confirmConsumerAutoClose(getChannel().getChannelId(), 
getConsumerTag());
+    }
 }

Added: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=796044&view=auto
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 (added)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 Mon Jul 20 22:18:21 2009
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.*;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+
+import sun.awt.X11.XSystemTrayPeer;
+
+public class Subscription_0_10 implements Subscription, 
FlowCreditManager.FlowCreditManagerListener
+{
+    private final QueueEntry.SubscriptionAcquiredState _owningState = new 
QueueEntry.SubscriptionAcquiredState(this);
+    private final Lock _stateChangeLock = new ReentrantLock();
+
+    private final AtomicReference<State> _state = new 
AtomicReference<State>(State.ACTIVE);
+    private final AtomicReference<QueueEntry> _queueContext = new 
AtomicReference<QueueEntry>(null);
+    private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+    private FlowCreditManager _creditManager;
+
+
+    private StateListener _stateListener = new StateListener()
+                                            {
+
+                                                public void 
stateChange(Subscription sub, State oldState, State newState)
+                                                {
+
+                                                }
+                                            };
+    private AMQQueue _queue;
+    private final String _destination;
+    private boolean _noLocal;
+    private final FilterManager _filters;
+    private final MessageAcceptMode _acceptMode;
+    private final MessageAcquireMode _acquireMode;
+    private final ServerSession _session;
+
+
+    public Subscription_0_10(ServerSession session, String destination, 
MessageAcceptMode acceptMode,
+                             MessageAcquireMode acquireMode, FlowCreditManager 
creditManager, FilterManager filters)
+    {
+        _session = session;
+        _destination = destination;
+        _acceptMode = acceptMode;
+        _acquireMode = acquireMode;
+        _creditManager = creditManager;
+        _filters = filters;
+        _creditManager.addStateListener(this);
+
+    }
+
+    public AMQQueue getQueue()
+    {
+        return _queue;
+    }
+
+    public QueueEntry.SubscriptionAcquiredState getOwningState()
+    {
+        return _owningState;
+    }
+
+    public void setQueue(AMQQueue queue)
+    {
+        if(getQueue() != null)
+        {
+            throw new IllegalStateException("Attempt to set queue for 
subscription " + this + " to " + queue + "when already set to " + getQueue());
+        }
+        _queue = queue;
+    }
+
+    public AMQShortString getConsumerTag()
+    {
+        return new AMQShortString(_destination);
+    }
+
+    public boolean isSuspended()
+    {
+        return !isActive() || _deleted.get(); // TODO check for Session 
suspension
+    }
+
+    public boolean hasInterest(QueueEntry entry)
+    {
+
+        //TODO 0-8/9 to 0-10 conversion
+        if(!(entry.getMessage() instanceof MessageTransferMessage))
+        {
+            return false;
+        }
+
+        //check that the message hasn't been rejected
+        if (entry.isRejectedBy(this))
+        {
+
+            return false;
+        }
+
+
+
+        if (_noLocal)
+        {
+
+
+        }
+
+
+        return checkFilters(entry);
+
+
+    }
+
+    private boolean checkFilters(QueueEntry entry)
+    {
+        return (_filters == null) || _filters.allAllow(entry.getMessage());
+    }
+
+    public boolean isAutoClose()
+    {
+        // no such thing in 0-10
+        return false;
+    }
+
+    public boolean isClosed()
+    {
+        return getState() == State.CLOSED;
+    }
+
+    public boolean isBrowser()
+    {
+        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public void close()
+    {
+        boolean closed = false;
+        State state = getState();
+
+        _stateChangeLock.lock();
+        try
+        {
+            while(!closed && state != State.CLOSED)
+            {
+                closed = _state.compareAndSet(state, State.CLOSED);
+                if(!closed)
+                {
+                    state = getState();
+                }
+                else
+                {
+                    _stateListener.stateChange(this,state, State.CLOSED);
+                }
+            }
+            _creditManager.removeListener(this);
+        }
+        finally
+        {
+            _stateChangeLock.unlock();
+        }
+
+
+
+    }
+
+    public void creditStateChanged(boolean hasCredit)
+    {
+
+        if(hasCredit)
+        {
+            if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+            {
+                _stateListener.stateChange(this, State.SUSPENDED, 
State.ACTIVE);
+            }
+            else
+            {
+                // this is a hack to get round the issue of increasing bytes 
credit
+                _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
+            }
+        }
+        else
+        {
+            if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+            {
+                _stateListener.stateChange(this, State.ACTIVE, 
State.SUSPENDED);
+            }
+        }
+    }
+
+
+    public void send(QueueEntry entry) throws AMQException
+    {
+        ServerMessage serverMsg = entry.getMessage();
+
+
+        MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+
+
+        MessageTransfer xfr = new MessageTransfer();
+        xfr.setDestination(_destination);
+        xfr.setBody(msg.getBody());
+        xfr.setAcceptMode(_acceptMode);
+        xfr.setAcquireMode(_acquireMode);
+
+        Struct[] headers = msg.getHeader().getStructs();
+
+        ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
+        DeliveryProperties origDeliveryProps = null;
+        for(Struct header : headers)
+        {
+            if(header instanceof DeliveryProperties)
+            {
+                origDeliveryProps = (DeliveryProperties) header;
+            }
+            else
+            {
+                newHeaders.add(header);
+            }
+        }
+
+        DeliveryProperties deliveryProps = new DeliveryProperties();
+        if(origDeliveryProps != null)
+        {
+            if(origDeliveryProps.hasDeliveryMode())
+            {
+                
deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+            }
+            if(origDeliveryProps.hasExchange())
+            {
+                deliveryProps.setExchange(origDeliveryProps.getExchange());
+            }
+            if(origDeliveryProps.hasExpiration())
+            {
+                deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+            }
+            if(origDeliveryProps.hasPriority())
+            {
+                deliveryProps.setPriority(origDeliveryProps.getPriority());
+            }
+            if(origDeliveryProps.hasRoutingKey())
+            {
+                deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+            }
+
+        }
+
+        deliveryProps.setRedelivered(entry.isRedelivered());
+
+        newHeaders.add(deliveryProps);
+        xfr.setHeader(new Header(newHeaders));
+
+
+        _session.sendMessage(xfr);
+
+
+    }
+
+    public void queueDeleted(AMQQueue queue)
+    {
+        _deleted.set(true);
+    }
+
+    public boolean wouldSuspend(QueueEntry msg)
+    {
+        return !_creditManager.useCreditForMessage(msg.getMessage());
+    }
+
+    public void getSendLock()
+    {
+        _stateChangeLock.lock();
+    }
+
+    public void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
+    }
+
+    public void restoreCredit(QueueEntry queueEntry)
+    {
+        _creditManager.addCredit(1, queueEntry.getSize());
+    }
+
+    public void setStateListener(StateListener listener)
+    {
+        _stateListener = listener;
+    }
+
+    public State getState()
+    {
+        return _state.get();
+    }
+
+    public QueueEntry getLastSeenEntry()
+    {
+        return _queueContext.get();
+    }
+
+    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+    {
+        return _queueContext.compareAndSet(expected, newValue);
+    }
+
+    public boolean isActive()
+    {
+        return getState() == State.ACTIVE;
+    }
+
+    public void confirmAutoClose()
+    {
+        //No such thing in 0-10
+    }
+
+
+    public FlowCreditManager getCreditManager()
+    {
+        return _creditManager;
+    }
+
+    public void setCreditManager(FlowCreditManager creditManager)
+    {
+        _creditManager.removeListener(this);
+
+        _creditManager = creditManager;
+
+        creditManager.addStateListener(this);
+
+    }
+
+
+}

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Mon Jul 20 22:18:21 2009
@@ -57,4 +57,9 @@
             e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
         }
     }
+
+    public void sendMessage(MessageTransfer xfr)
+    {
+        invoke(xfr);
+    }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
 Mon Jul 20 22:18:21 2009
@@ -30,13 +30,19 @@
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.flow.*;
 import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
 
 public class ServerSessionDelegate extends SessionDelegate
 {
     private final IApplicationRegistry _appRegistry;
+    private Map<String, Subscription_0_10> _subscriptions = new 
HashMap<String, Subscription_0_10>();
 
     public ServerSessionDelegate(IApplicationRegistry appRegistry)
     {
@@ -76,7 +82,31 @@
     @Override
     public void messageSubscribe(Session session, MessageSubscribe method)
     {
-        super.messageSubscribe(session, method);
+        String destination = method.getDestination();
+        String queueName = method.getQueue();
+        QueueRegistry queueRegistry = getQueueRegistry(session);
+
+        AMQQueue queue = queueRegistry.getQueue(queueName);
+
+        //TODO null check
+
+        FlowCreditManager creditManager = new MessageOnlyCreditManager(0L);
+
+        // TODO filters
+
+        Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, 
destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, 
null);
+
+        _subscriptions.put(destination, sub);
+        try
+        {
+            queue.registerSubscription(sub, method.getExclusive());
+        }
+        catch (AMQException e)
+        {
+            // TODO
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }        
+
     }
 
 
@@ -399,4 +429,21 @@
     {
         super.queueQuery(session, method);
     }
+
+
+    @Override
+    public void messageFlow(Session ssn, MessageFlow flow)
+    {
+        String destination = flow.getDestination();
+
+        Subscription_0_10 sub = _subscriptions.get(destination);
+
+        FlowCreditManager creditManager = sub.getCreditManager();
+
+        if(flow.getUnit() == MessageCreditUnit.MESSAGE)
+        {
+            creditManager.addCredit(flow.getValue(), 0L);
+        }
+
+    }
 }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 Mon Jul 20 22:18:21 2009
@@ -338,6 +338,11 @@
                     //To change body of implemented methods use File | 
Settings | File Templates.
                 }
 
+                public boolean isRedelivered()
+                {
+                    return false;  //To change body of implemented methods use 
File | Settings | File Templates.
+                }
+
                 public Subscription getDeliveredSubscription()
                 {
                     return null;  //To change body of implemented methods use 
File | Settings | File Templates.

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
 Mon Jul 20 22:18:21 2009
@@ -177,7 +177,12 @@
 
     }
 
-    
+    public boolean isRedelivered()
+    {
+        return false;  
+    }
+
+
     public int compareTo(QueueEntry o)
     {
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 Mon Jul 20 22:18:21 2009
@@ -99,6 +99,11 @@
         return true;
     }
 
+    public void confirmAutoClose()
+    {
+        
+    }
+
     public boolean isAutoClose()
     {
         return false;

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 Mon Jul 20 22:18:21 2009
@@ -146,6 +146,11 @@
         return false;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
+    public void confirmAutoClose()
+    {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
     public AMQQueue getQueue()
     {
         return null;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to