Author: rgodfrey
Date: Mon Jul 20 22:18:21 2009
New Revision: 796044
URL: http://svn.apache.org/viewvc?rev=796044&view=rev
Log:
Updated to get basic subscription functionality
Added:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
Mon Jul 20 22:18:21 2009
@@ -33,6 +33,16 @@
_bytesCredit = new AtomicLong(initialCredit);
}
+ public long getMessageCredit()
+ {
+ return -1L;
+ }
+
+ public long getBytesCredit()
+ {
+ return _bytesCredit.get();
+ }
+
public void addCredit(long messageCredit, long bytesCredit)
{
_bytesCredit.addAndGet(bytesCredit);
@@ -71,4 +81,9 @@
}
}
+
+ public void setBytesCredit(long bytesCredit)
+ {
+ _bytesCredit.set( bytesCredit );
+ }
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
Mon Jul 20 22:18:21 2009
@@ -25,6 +25,9 @@
*/
public interface FlowCreditManager
{
+ long getMessageCredit();
+
+ long getBytesCredit();
public static interface FlowCreditManagerListener
{
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
Mon Jul 20 22:18:21 2009
@@ -24,6 +24,16 @@
*/
public class LimitlessCreditManager extends AbstractFlowCreditManager
implements FlowCreditManager
{
+ public long getMessageCredit()
+ {
+ return -1L;
+ }
+
+ public long getBytesCredit()
+ {
+ return -1L;
+ }
+
public void addCredit(long messageCredit, long bytesCredit)
{
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
Mon Jul 20 22:18:21 2009
@@ -27,14 +27,24 @@
private long _messageCredit;
private long _bytesCredit;
- MessageAndBytesCreditManager(final long messageCredit, final long
bytesCredit)
+ public MessageAndBytesCreditManager(final long messageCredit, final long
bytesCredit)
{
_messageCredit = messageCredit;
_bytesCredit = bytesCredit;
}
- public synchronized void addCredit(long messageCredit, long bytesCredit)
+ public synchronized long getMessageCredit()
+ {
+ return _messageCredit;
+ }
+
+ public synchronized long getBytesCredit()
{
+ return _bytesCredit;
+ }
+
+ public synchronized void addCredit(long messageCredit, long bytesCredit)
+ {
_messageCredit += messageCredit;
_bytesCredit += bytesCredit;
setSuspended(hasCredit());
@@ -74,4 +84,9 @@
}
}
+
+ public synchronized void setBytesCredit(long bytesCredit)
+ {
+ _bytesCredit = bytesCredit;
+ }
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
Mon Jul 20 22:18:21 2009
@@ -33,10 +33,21 @@
_messageCredit = new AtomicLong(initialCredit);
}
+ public long getMessageCredit()
+ {
+ return _messageCredit.get();
+ }
+
+ public long getBytesCredit()
+ {
+ return -1L;
+ }
+
public void addCredit(long messageCredit, long bytesCredit)
{
- setSuspended(false);
_messageCredit.addAndGet(messageCredit);
+ setSuspended(false);
+
}
public void removeAllCredit()
@@ -73,4 +84,5 @@
}
}
+
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
Mon Jul 20 22:18:21 2009
@@ -81,6 +81,16 @@
}
+ public long getMessageCredit()
+ {
+ return _messageCredit;
+ }
+
+ public long getBytesCredit()
+ {
+ return _bytesCredit;
+ }
+
public synchronized void addCredit(final long messageCredit, final long
bytesCredit)
{
final long messageCreditLimit = _messageCreditLimit;
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
Mon Jul 20 22:18:21 2009
@@ -23,6 +23,7 @@
import org.apache.qpid.transport.*;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
public class MessageTransferMessage implements InboundMessage, ServerMessage
@@ -96,4 +97,17 @@
{
return _arrivalTime;
}
+
+ public Header getHeader()
+ {
+ return _xfr.getHeader();
+
+ }
+
+ public ByteBuffer getBody()
+ {
+ return _xfr.getBody();
+ }
+
+
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Mon Jul 20 22:18:21 2009
@@ -160,6 +160,8 @@
void setRedelivered(boolean b);
+ boolean isRedelivered();
+
Subscription getDeliveredSubscription();
void reject();
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Mon Jul 20 22:18:21 2009
@@ -199,6 +199,11 @@
_redelivered = b;
}
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
public Subscription getDeliveredSubscription()
{
EntryState state = _state;
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Mon Jul 20 22:18:21 2009
@@ -1214,8 +1214,8 @@
{
unregisterSubscription(sub);
- ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
+ sub.confirmAutoClose();
+
}
else if (!atTail)
{
@@ -1396,8 +1396,7 @@
{
unregisterSubscription(sub);
- ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
+ sub.confirmAutoClose();
}
}
else
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
Mon Jul 20 22:18:21 2009
@@ -30,6 +30,7 @@
{
+
public static enum State
{
ACTIVE,
@@ -48,8 +49,6 @@
void setQueue(AMQQueue queue);
- AMQChannel getChannel();
-
AMQShortString getConsumerTag();
boolean isSuspended();
@@ -64,8 +63,6 @@
void close();
- boolean filtersMessages();
-
void send(QueueEntry msg) throws AMQException;
void queueDeleted(AMQQueue queue);
@@ -74,9 +71,8 @@
boolean wouldSuspend(QueueEntry msg);
void getSendLock();
- void releaseSendLock();
- void resend(final QueueEntry entry) throws AMQException;
+ void releaseSendLock();
void restoreCredit(final QueueEntry queueEntry);
@@ -91,6 +87,7 @@
boolean isActive();
+ void confirmAutoClose();
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
Mon Jul 20 22:18:21 2009
@@ -32,6 +32,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -374,6 +375,14 @@
public boolean hasInterest(QueueEntry entry)
{
+
+ // TODO 0-10 to 0-8 conversion
+ if(!(entry.getMessage() instanceof AMQMessage))
+ {
+ return false;
+ }
+
+
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
{
@@ -516,11 +525,6 @@
_stateChangeLock.unlock();
}
- public void resend(final QueueEntry entry) throws AMQException
- {
- _queue.resend(entry, this);
- }
-
public AMQChannel getChannel()
{
return _channel;
@@ -617,4 +621,9 @@
return _owningState;
}
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter =
getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(),
getConsumerTag());
+ }
}
Added:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=796044&view=auto
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
(added)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Mon Jul 20 22:18:21 2009
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.*;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+
+import sun.awt.X11.XSystemTrayPeer;
+
+public class Subscription_0_10 implements Subscription,
FlowCreditManager.FlowCreditManagerListener
+{
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new
QueueEntry.SubscriptionAcquiredState(this);
+ private final Lock _stateChangeLock = new ReentrantLock();
+
+ private final AtomicReference<State> _state = new
AtomicReference<State>(State.ACTIVE);
+ private final AtomicReference<QueueEntry> _queueContext = new
AtomicReference<QueueEntry>(null);
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+ private FlowCreditManager _creditManager;
+
+
+ private StateListener _stateListener = new StateListener()
+ {
+
+ public void
stateChange(Subscription sub, State oldState, State newState)
+ {
+
+ }
+ };
+ private AMQQueue _queue;
+ private final String _destination;
+ private boolean _noLocal;
+ private final FilterManager _filters;
+ private final MessageAcceptMode _acceptMode;
+ private final MessageAcquireMode _acquireMode;
+ private final ServerSession _session;
+
+
+ public Subscription_0_10(ServerSession session, String destination,
MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode, FlowCreditManager
creditManager, FilterManager filters)
+ {
+ _session = session;
+ _destination = destination;
+ _acceptMode = acceptMode;
+ _acquireMode = acquireMode;
+ _creditManager = creditManager;
+ _filters = filters;
+ _creditManager.addStateListener(this);
+
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+ public void setQueue(AMQQueue queue)
+ {
+ if(getQueue() != null)
+ {
+ throw new IllegalStateException("Attempt to set queue for
subscription " + this + " to " + queue + "when already set to " + getQueue());
+ }
+ _queue = queue;
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return new AMQShortString(_destination);
+ }
+
+ public boolean isSuspended()
+ {
+ return !isActive() || _deleted.get(); // TODO check for Session
suspension
+ }
+
+ public boolean hasInterest(QueueEntry entry)
+ {
+
+ //TODO 0-8/9 to 0-10 conversion
+ if(!(entry.getMessage() instanceof MessageTransferMessage))
+ {
+ return false;
+ }
+
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(this))
+ {
+
+ return false;
+ }
+
+
+
+ if (_noLocal)
+ {
+
+
+ }
+
+
+ return checkFilters(entry);
+
+
+ }
+
+ private boolean checkFilters(QueueEntry entry)
+ {
+ return (_filters == null) || _filters.allAllow(entry.getMessage());
+ }
+
+ public boolean isAutoClose()
+ {
+ // no such thing in 0-10
+ return false;
+ }
+
+ public boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public void close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ _stateChangeLock.lock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = _state.compareAndSet(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ else
+ {
+ _stateListener.stateChange(this,state, State.CLOSED);
+ }
+ }
+ _creditManager.removeListener(this);
+ }
+ finally
+ {
+ _stateChangeLock.unlock();
+ }
+
+
+
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED,
State.ACTIVE);
+ }
+ else
+ {
+ // this is a hack to get round the issue of increasing bytes
credit
+ _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE,
State.SUSPENDED);
+ }
+ }
+ }
+
+
+ public void send(QueueEntry entry) throws AMQException
+ {
+ ServerMessage serverMsg = entry.getMessage();
+
+
+ MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+
+
+ MessageTransfer xfr = new MessageTransfer();
+ xfr.setDestination(_destination);
+ xfr.setBody(msg.getBody());
+ xfr.setAcceptMode(_acceptMode);
+ xfr.setAcquireMode(_acquireMode);
+
+ Struct[] headers = msg.getHeader().getStructs();
+
+ ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
+ DeliveryProperties origDeliveryProps = null;
+ for(Struct header : headers)
+ {
+ if(header instanceof DeliveryProperties)
+ {
+ origDeliveryProps = (DeliveryProperties) header;
+ }
+ else
+ {
+ newHeaders.add(header);
+ }
+ }
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+
deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+
+ }
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ newHeaders.add(deliveryProps);
+ xfr.setHeader(new Header(newHeaders));
+
+
+ _session.sendMessage(xfr);
+
+
+ }
+
+ public void queueDeleted(AMQQueue queue)
+ {
+ _deleted.set(true);
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !_creditManager.useCreditForMessage(msg.getMessage());
+ }
+
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ _creditManager.addCredit(1, queueEntry.getSize());
+ }
+
+ public void setStateListener(StateListener listener)
+ {
+ _stateListener = listener;
+ }
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ public QueueEntry getLastSeenEntry()
+ {
+ return _queueContext.get();
+ }
+
+ public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ {
+ return _queueContext.compareAndSet(expected, newValue);
+ }
+
+ public boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public void confirmAutoClose()
+ {
+ //No such thing in 0-10
+ }
+
+
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+
+ public void setCreditManager(FlowCreditManager creditManager)
+ {
+ _creditManager.removeListener(this);
+
+ _creditManager = creditManager;
+
+ creditManager.addStateListener(this);
+
+ }
+
+
+}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Mon Jul 20 22:18:21 2009
@@ -57,4 +57,9 @@
e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
}
}
+
+ public void sendMessage(MessageTransfer xfr)
+ {
+ invoke(xfr);
+ }
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Mon Jul 20 22:18:21 2009
@@ -30,13 +30,19 @@
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.flow.*;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
public class ServerSessionDelegate extends SessionDelegate
{
private final IApplicationRegistry _appRegistry;
+ private Map<String, Subscription_0_10> _subscriptions = new
HashMap<String, Subscription_0_10>();
public ServerSessionDelegate(IApplicationRegistry appRegistry)
{
@@ -76,7 +82,31 @@
@Override
public void messageSubscribe(Session session, MessageSubscribe method)
{
- super.messageSubscribe(session, method);
+ String destination = method.getDestination();
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+
+ //TODO null check
+
+ FlowCreditManager creditManager = new MessageOnlyCreditManager(0L);
+
+ // TODO filters
+
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
destination,method.getAcceptMode(),method.getAcquireMode(), creditManager,
null);
+
+ _subscriptions.put(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
+ }
+
}
@@ -399,4 +429,21 @@
{
super.queueQuery(session, method);
}
+
+
+ @Override
+ public void messageFlow(Session ssn, MessageFlow flow)
+ {
+ String destination = flow.getDestination();
+
+ Subscription_0_10 sub = _subscriptions.get(destination);
+
+ FlowCreditManager creditManager = sub.getCreditManager();
+
+ if(flow.getUnit() == MessageCreditUnit.MESSAGE)
+ {
+ creditManager.addCredit(flow.getValue(), 0L);
+ }
+
+ }
}
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Mon Jul 20 22:18:21 2009
@@ -338,6 +338,11 @@
//To change body of implemented methods use File |
Settings | File Templates.
}
+ public boolean isRedelivered()
+ {
+ return false; //To change body of implemented methods use
File | Settings | File Templates.
+ }
+
public Subscription getDeliveredSubscription()
{
return null; //To change body of implemented methods use
File | Settings | File Templates.
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
Mon Jul 20 22:18:21 2009
@@ -177,7 +177,12 @@
}
-
+ public boolean isRedelivered()
+ {
+ return false;
+ }
+
+
public int compareTo(QueueEntry o)
{
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
Mon Jul 20 22:18:21 2009
@@ -99,6 +99,11 @@
return true;
}
+ public void confirmAutoClose()
+ {
+
+ }
+
public boolean isAutoClose()
{
return false;
Modified:
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=796044&r1=796043&r2=796044&view=diff
==============================================================================
---
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
(original)
+++
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Mon Jul 20 22:18:21 2009
@@ -146,6 +146,11 @@
return false; //To change body of implemented methods use File |
Settings | File Templates.
}
+ public void confirmAutoClose()
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
public AMQQueue getQueue()
{
return null;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]